[MDL_GROUP] = LCK_GROUP
};
-
static struct mdt_device *mdt_dev(struct lu_device *d);
static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
{
lh->mlh_pdo_hash = 0;
lh->mlh_reg_mode = lm;
+ lh->mlh_rreg_mode = lm;
lh->mlh_type = MDT_REG_LOCK;
}
const char *name, int namelen)
{
lh->mlh_reg_mode = lm;
+ lh->mlh_rreg_mode = lm;
lh->mlh_type = MDT_PDO_LOCK;
if (name != NULL && (name[0] != '\0')) {
repbody->valid |= OBD_MD_FLID;
if (mdt->mdt_opts.mo_mds_capa &&
- info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
+ exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) {
struct mdt_object *root;
struct lustre_capa *capa;
if (fid) {
b->fid1 = *fid;
b->valid |= OBD_MD_FLID;
-
- /* FIXME: these should be fixed when new igif ready.*/
- b->ino = fid_oid(fid); /* 1.6 compatibility */
- b->generation = fid_ver(fid); /* 1.6 compatibility */
- b->valid |= OBD_MD_FLGENER; /* 1.6 compatibility */
-
CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, size="LPU64"\n",
PFID(fid), b->nlink, b->mode, b->size);
}
struct lu_attr *la = &ma->ma_attr;
ENTRY;
- if (exp->exp_connect_flags & OBD_CONNECT_LAYOUTLOCK)
- /* the client can deal with 16-bit lmm_stripe_count */
- RETURN_EXIT;
+ if (exp_connect_layout(exp))
+ /* the client can deal with 16-bit lmm_stripe_count */
+ RETURN_EXIT;
body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
ma->ma_valid = 0;
- rc = mdt_object_exists(o);
- if (rc < 0) {
- /* This object is located on remote node.*/
- repbody->fid1 = *mdt_object_fid(o);
- repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
- GOTO(out, rc = 0);
- }
+ if (mdt_object_remote(o)) {
+ /* This object is located on remote node.*/
+ /* Return -EIO for old client */
+ if (!mdt_is_dne_client(req->rq_export))
+ GOTO(out, rc = -EIO);
+
+ repbody->fid1 = *mdt_object_fid(o);
+ repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
+ GOTO(out, rc = 0);
+ }
buffer->lb_len = reqbody->eadatasize;
if (buffer->lb_len > 0)
} else {
ma->ma_lmm = buffer->lb_buf;
ma->ma_lmm_size = buffer->lb_len;
- ma->ma_need = MA_LOV | MA_INODE;
+ ma->ma_need = MA_LOV | MA_INODE | MA_HSM;
}
if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
}
}
#ifdef CONFIG_FS_POSIX_ACL
- else if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
- (reqbody->valid & OBD_MD_FLACL)) {
+ else if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
+ (reqbody->valid & OBD_MD_FLACL)) {
buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
buffer->lb_len = req_capsule_get_size(pill,
&RMF_ACL, RCL_SERVER);
}
#endif
- if (reqbody->valid & OBD_MD_FLMDSCAPA &&
- info->mti_mdt->mdt_opts.mo_mds_capa &&
- info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
+ if (reqbody->valid & OBD_MD_FLMDSCAPA &&
+ info->mti_mdt->mdt_opts.mo_mds_capa &&
+ exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) {
struct lustre_capa *capa;
capa = req_capsule_server_get(pill, &RMF_CAPA1);
* return directly, client will find body->valid OBD_MD_FLOSSCAPA
* flag not set.
*/
- if (!obj || !info->mti_mdt->mdt_opts.mo_oss_capa ||
- !(info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA))
- RETURN(0);
+ if (!obj || !info->mti_mdt->mdt_opts.mo_oss_capa ||
+ !(exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA))
+ RETURN(0);
body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(body != NULL);
repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
- /*
- * We save last checked parent fid to @repbody->fid1 for remote
- * directory case.
- */
- LASSERT(fid_is_sane(&body->fid2));
- LASSERT(mdt_object_exists(o) > 0);
- rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
- &body->fid2, &repbody->fid1);
- if (rc == 0 || rc == -EREMOTE)
- repbody->valid |= OBD_MD_FLID;
+ /*
+ * We save last checked parent fid to @repbody->fid1 for remote
+ * directory case.
+ */
+ LASSERT(fid_is_sane(&body->fid2));
+ LASSERT(mdt_object_exists(o) && !mdt_object_remote(o));
+ rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
+ &body->fid2, &repbody->fid1);
+ if (rc == 0 || rc == -EREMOTE)
+ repbody->valid |= OBD_MD_FLID;
- RETURN(rc);
+ RETURN(rc);
+}
+
+int mdt_swap_layouts(struct mdt_thread_info *info)
+{
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct obd_export *exp = req->rq_export;
+ struct mdt_object *o1, *o2, *o;
+ struct mdt_lock_handle *lh1, *lh2;
+ struct mdc_swap_layouts *msl;
+ int rc;
+ ENTRY;
+
+ /* client does not support layout lock, so layout swaping
+ * is disabled.
+ * FIXME: there is a problem for old clients which don't support
+ * layout lock yet. If those clients have already opened the file
+ * they won't be notified at all so that old layout may still be
+ * used to do IO. This can be fixed after file release is landed by
+ * doing exclusive open and taking full EX ibits lock. - Jinshan */
+ if (!exp_connect_layout(exp))
+ RETURN(-EOPNOTSUPP);
+
+ if (req_capsule_get_size(info->mti_pill, &RMF_CAPA1, RCL_CLIENT))
+ mdt_set_capainfo(info, 0, &info->mti_body->fid1,
+ req_capsule_client_get(info->mti_pill,
+ &RMF_CAPA1));
+
+ if (req_capsule_get_size(info->mti_pill, &RMF_CAPA2, RCL_CLIENT))
+ mdt_set_capainfo(info, 1, &info->mti_body->fid2,
+ req_capsule_client_get(info->mti_pill,
+ &RMF_CAPA2));
+
+ o1 = info->mti_object;
+ o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+ &info->mti_body->fid2);
+ if (IS_ERR(o))
+ GOTO(out, rc = PTR_ERR(o));
+
+ if (mdt_object_exists(o) < 0) /* remote object */
+ GOTO(put, rc = -ENOENT);
+
+ rc = lu_fid_cmp(&info->mti_body->fid1, &info->mti_body->fid2);
+ if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
+ GOTO(put, rc);
+
+ if (rc < 0)
+ swap(o1, o2);
+
+ /* permission check. Make sure the calling process having permission
+ * to write both files. */
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
+ MAY_WRITE);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
+ MAY_WRITE);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
+ LASSERT(msl != NULL);
+
+ lh1 = &info->mti_lh[MDT_LH_NEW];
+ mdt_lock_reg_init(lh1, LCK_EX);
+ lh2 = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lh2, LCK_EX);
+
+ rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+ if (rc < 0)
+ GOTO(unlock1, rc);
+
+ rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
+ mdt_object_child(o2), msl->msl_flags);
+ GOTO(unlock2, rc);
+unlock2:
+ mdt_object_unlock(info, o2, lh2, rc);
+unlock1:
+ mdt_object_unlock(info, o1, lh1, rc);
+put:
+ mdt_object_put(info->mti_env, o);
+out:
+ RETURN(rc);
}
static int mdt_raw_lookup(struct mdt_thread_info *info,
}
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
- rc = mdt_object_exists(parent);
- if (unlikely(rc == 0)) {
+ if (unlikely(!mdt_object_exists(parent))) {
LU_OBJECT_DEBUG(D_INODE, info->mti_env,
&parent->mot_obj.mo_lu,
"Parent doesn't exist!\n");
RETURN(-ESTALE);
} else if (!info->mti_cross_ref) {
- LASSERTF(rc > 0, "Parent "DFID" is on remote server\n",
+ LASSERTF(!mdt_object_remote(parent),
+ "Parent "DFID" is on remote server\n",
PFID(mdt_object_fid(parent)));
}
if (lname) {
* needed here but update is.
*/
child_bits &= ~MDS_INODELOCK_LOOKUP;
- child_bits |= MDS_INODELOCK_UPDATE;
+ child_bits |= MDS_INODELOCK_PERM | MDS_INODELOCK_UPDATE;
- rc = mdt_object_lock(info, child, lhc, child_bits,
+ rc = mdt_object_lock(info, child, lhc, child_bits,
MDT_LOCAL_LOCK);
}
if (rc == 0) {
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PR);
- if (mdt_object_exists(child) == 0) {
- LU_OBJECT_DEBUG(D_INODE, info->mti_env,
- &child->mot_obj.mo_lu,
- "Object doesn't exist!\n");
- GOTO(out_child, rc = -ENOENT);
- }
+ if (!mdt_object_exists(child)) {
+ LU_OBJECT_DEBUG(D_INODE, info->mti_env,
+ &child->mot_obj.mo_lu,
+ "Object doesn't exist!\n");
+ GOTO(out_child, rc = -ENOENT);
+ }
- if (!(child_bits & MDS_INODELOCK_UPDATE)) {
+ if (!(child_bits & MDS_INODELOCK_UPDATE) &&
+ mdt_object_exists(child) && !mdt_object_remote(child)) {
struct md_attr *ma = &info->mti_attr;
ma->ma_valid = 0;
(unsigned long)res_id->name[1],
(unsigned long)res_id->name[2],
PFID(mdt_object_fid(child)));
- mdt_pack_size2body(info, child);
+ if (mdt_object_exists(child) && !mdt_object_remote(child))
+ mdt_pack_size2body(info, child);
}
if (lock)
LDLM_LOCK_PUT(lock);
spin_lock(&req->rq_export->exp_lock);
if (*(__u32 *)val)
- req->rq_export->exp_connect_flags |= OBD_CONNECT_RDONLY;
+ *exp_connect_flags_ptr(req->rq_export) |=
+ OBD_CONNECT_RDONLY;
else
- req->rq_export->exp_connect_flags &=~OBD_CONNECT_RDONLY;
+ *exp_connect_flags_ptr(req->rq_export) &=
+ ~OBD_CONNECT_RDONLY;
spin_unlock(&req->rq_export->exp_lock);
} else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
reply = req_capsule_server_get(info->mti_pill, &RMF_CONNECT_DATA);
exp = req->rq_export;
spin_lock(&exp->exp_lock);
- exp->exp_connect_flags = reply->ocd_connect_flags;
+ *exp_connect_flags_ptr(exp) = reply->ocd_connect_flags;
spin_unlock(&exp->exp_lock);
rc = mdt_init_idmap(info);
int rc;
ENTRY;
- desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
- MDS_BULK_PORTAL);
- if (desc == NULL)
- RETURN(-ENOMEM);
+ desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
+ MDS_BULK_PORTAL);
+ if (desc == NULL)
+ RETURN(-ENOMEM);
- if (!(exp->exp_connect_flags & OBD_CONNECT_BRW_SIZE))
+ if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
/* old client requires reply size in it's PAGE_SIZE,
- * which is rdpg->rp_count */
+ * which is rdpg->rp_count */
nob = rdpg->rp_count;
for (i = 0, tmpcount = nob; i < rdpg->rp_npages && tmpcount > 0;
}
rdpg->rp_attrs = reqbody->mode;
- if (info->mti_exp->exp_connect_flags & OBD_CONNECT_64BITHASH)
- rdpg->rp_attrs |= LUDA_64BITHASH;
- rdpg->rp_count = min_t(unsigned int, reqbody->nlink,
- PTLRPC_MAX_BRW_SIZE);
+ if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH)
+ rdpg->rp_attrs |= LUDA_64BITHASH;
+ rdpg->rp_count = min_t(unsigned int, reqbody->nlink,
+ exp_max_brw_size(info->mti_exp));
rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
CFS_PAGE_SHIFT;
OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
int mdt_reint(struct mdt_thread_info *info)
{
- long opc;
- int rc;
-
- static const struct req_format *reint_fmts[REINT_MAX] = {
- [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
- [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
- [REINT_LINK] = &RQF_MDS_REINT_LINK,
- [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
- [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
- [REINT_OPEN] = &RQF_MDS_REINT_OPEN,
- [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR
- };
+ long opc;
+ int rc;
+
+ static const struct req_format *reint_fmts[REINT_MAX] = {
+ [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
+ [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
+ [REINT_LINK] = &RQF_MDS_REINT_LINK,
+ [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
+ [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
+ [REINT_OPEN] = &RQF_MDS_REINT_OPEN,
+ [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
+ [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK
+ };
ENTRY;
if (req_ii->ii_count <= 0)
GOTO(out, rc = -EFAULT);
rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
- PTLRPC_MAX_BRW_SIZE);
+ exp_max_brw_size(info->mti_exp));
rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT;
/* allocate pages to store the containers */
RETURN(rc);
}
+int mdt_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ struct lustre_handle lockh;
+ int rc;
+
+ switch (flag) {
+ case LDLM_CB_BLOCKING:
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc < 0) {
+ CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
+ RETURN(rc);
+ }
+ break;
+ case LDLM_CB_CANCELING:
+ LDLM_DEBUG(lock, "Revoke remote lock\n");
+ break;
+ default:
+ LBUG();
+ }
+ RETURN(0);
+}
+
+int mdt_remote_object_lock(struct mdt_thread_info *mti,
+ struct mdt_object *o, struct lustre_handle *lh,
+ ldlm_mode_t mode, __u64 ibits)
+{
+ struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
+ ldlm_policy_data_t *policy = &mti->mti_policy;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(mdt_object_remote(o));
+
+ LASSERT((ibits & MDS_INODELOCK_UPDATE));
+
+ memset(einfo, 0, sizeof(*einfo));
+ einfo->ei_type = LDLM_IBITS;
+ einfo->ei_mode = mode;
+ einfo->ei_cb_bl = mdt_md_blocking_ast;
+ einfo->ei_cb_cp = ldlm_completion_ast;
+
+ memset(policy, 0, sizeof(*policy));
+ policy->l_inodebits.bits = ibits;
+
+ rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
+ policy);
+ RETURN(rc);
+}
+
static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
struct mdt_lock_handle *lh, __u64 ibits,
bool nonblock, int locality)
LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
LASSERT(lh->mlh_type != MDT_NUL_LOCK);
- if (mdt_object_exists(o) < 0) {
+ if (mdt_object_remote(o)) {
if (locality == MDT_CROSS_LOCK) {
- /* cross-ref object fix */
- ibits &= ~MDS_INODELOCK_UPDATE;
+ ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM);
ibits |= MDS_INODELOCK_LOOKUP;
} else {
- LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
+ LASSERTF(!(ibits &
+ (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM)),
+ "%s: wrong bit "LPX64" for remote obj "DFID"\n",
+ mdt_obd_name(info->mti_mdt), ibits,
+ PFID(mdt_object_fid(o)));
LASSERT(ibits & MDS_INODELOCK_LOOKUP);
}
/* No PDO lock on remote object */
LASSERT(lh->mlh_type != MDT_PDO_LOCK);
}
- if (lh->mlh_type == MDT_PDO_LOCK) {
+ if (lh->mlh_type == MDT_PDO_LOCK) {
/* check for exists after object is locked */
if (mdt_object_exists(o) == 0) {
/* Non-existent object shouldn't have PDO lock */
mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
+ if (lustre_handle_is_used(&lh->mlh_rreg_lh))
+ ldlm_lock_decref(&lh->mlh_rreg_lh, lh->mlh_rreg_mode);
+
EXIT;
}
mdt_object_put(info->mti_env, o);
}
-static struct mdt_handler *mdt_handler_find(__u32 opc,
- struct mdt_opc_slice *supported)
+struct mdt_handler *mdt_handler_find(__u32 opc, struct mdt_opc_slice *supported)
{
struct mdt_opc_slice *s;
struct mdt_handler *h;
rc = mdt_unpack_req_pack_rep(info, flags);
}
- if (rc == 0 && flags & MUTABOR &&
- req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- /* should it be rq_status? */
- rc = -EROFS;
+ if (rc == 0 && flags & MUTABOR &&
+ exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+ /* should it be rq_status? */
+ rc = -EROFS;
if (rc == 0 && flags & HABEO_CLAVIS) {
struct ldlm_request *dlm_req;
lh->mlh_reg_mode = LCK_MINMODE;
lh->mlh_pdo_lh.cookie = 0ull;
lh->mlh_pdo_mode = LCK_MINMODE;
+ lh->mlh_rreg_lh.cookie = 0ull;
+ lh->mlh_rreg_mode = LCK_MINMODE;
}
void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
case MDS_SETXATTR:
case MDS_SET_INFO:
case MDS_GET_INFO:
+ case MDS_HSM_PROGRESS:
+ case MDS_HSM_REQUEST:
+ case MDS_HSM_CT_REGISTER:
+ case MDS_HSM_CT_UNREGISTER:
+ case MDS_HSM_STATE_GET:
+ case MDS_HSM_STATE_SET:
+ case MDS_HSM_ACTION:
case MDS_QUOTACHECK:
case MDS_QUOTACTL:
+ case UPDATE_OBJ:
+ case MDS_SWAP_LAYOUTS:
case QUOTA_DQACQ:
case QUOTA_DQREL:
case SEQ_QUERY:
switch (opcode) {
case MDT_IT_LOOKUP:
- child_bits = MDS_INODELOCK_LOOKUP;
+ child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
break;
case MDT_IT_GETATTR:
- child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
+ child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
+ MDS_INODELOCK_PERM;
break;
default:
CERROR("Unsupported intent (%d)\n", opcode);
rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
if (rc == 0) {
struct ptlrpc_request *req = mdt_info_req(info);
- if (flv->it_flags & MUTABOR &&
- req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- RETURN(-EROFS);
+ if (flv->it_flags & MUTABOR &&
+ exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+ RETURN(-EROFS);
}
if (rc == 0 && flv->it_act != NULL) {
/* execute policy */
}
/*
- * Init client sequence manager which is used by local MDS to talk to sequence
- * controller on remote node.
- */
-static int mdt_seq_init_cli(const struct lu_env *env,
- struct mdt_device *m,
- struct lustre_cfg *cfg)
-{
- struct seq_server_site *ss = mdt_seq_site(m);
- struct obd_device *mdc;
- int rc;
- int index;
- struct mdt_thread_info *info;
- char *p, *index_string = lustre_cfg_string(cfg, 2);
- ENTRY;
-
- info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
-
- LASSERT(index_string);
- index = simple_strtol(index_string, &p, 10);
- if (*p) {
- CERROR("%s: Invalid index in lustre_cgf, offset 2\n",
- mdt2obd_dev(m)->obd_name);
- RETURN(-EINVAL);
- }
-
- /* check if this is adding the first MDC and controller is not yet
- * initialized. */
- if (index != 0 || ss->ss_client_seq)
- RETURN(0);
-
- mdc = class_name2obd(lustre_cfg_string(cfg, 1));
- if (!mdc) {
- CERROR("%s: can't find %s device\n",
- mdt2obd_dev(m)->obd_name, lustre_cfg_string(cfg, 1));
- rc = -ENOENT;
- } else if (!mdc->obd_set_up) {
- CERROR("%s: target %s not set up\n",
- mdt2obd_dev(m)->obd_name, mdc->obd_name);
- rc = -EINVAL;
- } else {
- LASSERT(ss->ss_control_exp);
- OBD_ALLOC_PTR(ss->ss_client_seq);
- if (ss->ss_client_seq != NULL) {
- char *prefix;
-
- OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
- if (!prefix)
- RETURN(-ENOMEM);
-
- snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
- mdc->obd_name);
-
- rc = seq_client_init(ss->ss_client_seq,
- ss->ss_control_exp,
- LUSTRE_SEQ_METADATA,
- prefix, NULL);
- OBD_FREE(prefix, MAX_OBD_NAME + 5);
- } else
- rc = -ENOMEM;
-
- if (rc)
- RETURN(rc);
-
- LASSERT(ss->ss_server_seq != NULL);
- rc = seq_server_set_cli(ss->ss_server_seq, ss->ss_client_seq,
- env);
- }
-
- RETURN(rc);
-}
-
-static void mdt_seq_fini_cli(struct mdt_device *m)
-{
- struct seq_server_site *ss;
-
- ENTRY;
-
- ss = mdt_seq_site(m);
-
- if (ss == NULL)
- RETURN_EXIT;
-
- if (ss->ss_server_seq)
- seq_server_set_cli(ss->ss_server_seq, NULL, NULL);
-
- if (ss->ss_control_exp) {
- class_export_put(ss->ss_control_exp);
- ss->ss_control_exp = NULL;
- }
-
- EXIT;
-}
-
-/*
* FLD wrappers
*/
static int mdt_fld_fini(const struct lu_env *env,
RETURN(0);
}
+static void mdt_stack_pre_fini(const struct lu_env *env,
+ struct mdt_device *m, struct lu_device *top)
+{
+ struct obd_device *obd = mdt2obd_dev(m);
+ struct lustre_cfg_bufs *bufs;
+ struct lustre_cfg *lcfg;
+ struct mdt_thread_info *info;
+ ENTRY;
+
+ LASSERT(top);
+
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ LASSERT(info != NULL);
+
+ bufs = &info->mti_u.bufs;
+
+ LASSERT(m->mdt_child_exp);
+ LASSERT(m->mdt_child_exp->exp_obd);
+ obd = m->mdt_child_exp->exp_obd;
+
+ /* process cleanup, pass mdt obd name to get obd umount flags */
+ /* XXX: this is needed because all layers are referenced by
+ * objects (some of them are pinned by osd, for example *
+ * the proper solution should be a model where object used
+ * by osd only doesn't have mdt/mdd slices -bzzz */
+ lustre_cfg_bufs_reset(bufs, obd->obd_name);
+ lustre_cfg_bufs_set_string(bufs, 1, NULL);
+ lcfg = lustre_cfg_new(LCFG_PRE_CLEANUP, bufs);
+ if (!lcfg) {
+ CERROR("%s:Cannot alloc lcfg!\n", mdt_obd_name(m));
+ return;
+ }
+ top->ld_ops->ldo_process_config(env, top, lcfg);
+ lustre_cfg_free(lcfg);
+ EXIT;
+}
+
static void mdt_stack_fini(const struct lu_env *env,
struct mdt_device *m, struct lu_device *top)
{
ping_evictor_stop();
+
+ mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
obd_exports_barrier(obd);
obd_zombie_barrier();
m->mdt_nosquash_strlen = 0;
}
+ next->md_ops->mdo_iocontrol(env, next, OBD_IOC_PAUSE_LFSCK,
+ 0, NULL);
mdt_seq_fini(env, m);
- mdt_seq_fini_cli(m);
mdt_fld_fini(env, m);
sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
init_rwsem(&m->mdt_squash_sem);
spin_lock_init(&m->mdt_osfs_lock);
m->mdt_osfs_age = cfs_time_shift_64(-1000);
+ m->mdt_enable_remote_dir = 0;
m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
m->mdt_md_dev.md_lu_dev.ld_obd = obd;
if (rc)
GOTO(err_fini_stack, rc);
+ rc = mdt_fld_init(env, obd->obd_name, m);
+ if (rc)
+ GOTO(err_lut, rc);
+
+ rc = mdt_seq_init(env, obd->obd_name, m);
+ if (rc)
+ GOTO(err_fini_fld, rc);
+
snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
LUSTRE_MDT_NAME"-%p", m);
m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
ldlm_namespace_free(m->mdt_namespace, NULL, 0);
obd->obd_namespace = m->mdt_namespace = NULL;
err_fini_seq:
- mdt_seq_fini(env, m);
- mdt_fld_fini(env, m);
- tgt_fini(env, &m->mdt_lut);
+ mdt_seq_fini(env, m);
+err_fini_fld:
+ mdt_fld_fini(env, m);
+err_lut:
+ tgt_fini(env, &m->mdt_lut);
err_fini_stack:
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
err_lmi:
break;
}
- case LCFG_ADD_MDC:
- /*
- * Add mdc hook to get first MDT uuid and connect it to
- * ls->controller to use for seq manager.
- */
- rc = next->ld_ops->ldo_process_config(env, next, cfg);
- if (rc)
- CERROR("Can't add mdc, rc %d\n", rc);
- else
- rc = mdt_seq_init_cli(env, mdt_dev(d), cfg);
- break;
default:
/* others are passed further */
rc = next->ld_ops->ldo_process_config(env, next, cfg);
if (rc)
RETURN(rc);
- rc = mdt_fld_init(env, obd->obd_name, mdt);
- if (rc)
- RETURN(rc);
-
- rc = mdt_seq_init(env, obd->obd_name, mdt);
- if (rc)
- RETURN(rc);
+ rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child,
+ OBD_IOC_START_LFSCK,
+ 0, NULL);
+ if (rc != 0)
+ CWARN("Fail to auto trigger paused LFSCK.\n");
rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
&mdt->mdt_md_root_fid);
* Compute the compatibility flags for a connection request based on
* features mutually supported by client and server.
*
- * The obd_export::exp_connect_flags field in \a exp must not be updated
- * here, otherwise a partially initialized value may be exposed. After
- * the connection request is successfully processed, the top-level MDT
- * connect request handler atomically updates the export connect flags
- * from the obd_connect_data::ocd_connect_flags field of the reply.
- * \see mdt_connect().
+ * The obd_export::exp_connect_data.ocd_connect_flags field in \a exp
+ * must not be updated here, otherwise a partially initialized value may
+ * be exposed. After the connection request is successfully processed,
+ * the top-level MDT connect request handler atomically updates the export
+ * connect flags from the obd_connect_data::ocd_connect_flags field of the
+ * reply. \see mdt_connect().
*
* \param exp the obd_export associated with this client/target pair
* \param mdt the target device for the connection
if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
data->ocd_brw_size = min(data->ocd_brw_size,
- (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
+ (__u32)MD_MAX_BRW_SIZE);
if (data->ocd_brw_size == 0) {
CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
" ocd_version: %x ocd_grant: %d "
}
}
- /* NB: Disregard the rule against updating exp_connect_flags in this
- * case, since tgt_client_new() needs to know if this is a lightweight
- * connection, and it is safe to expose this flag before connection
- * processing completes. */
+ /* NB: Disregard the rule against updating
+ * exp_connect_data.ocd_connect_flags in this case, since
+ * tgt_client_new() needs to know if this is a lightweight
+ * connection, and it is safe to expose this flag before
+ * connection processing completes. */
if (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) {
spin_lock(&exp->exp_lock);
- exp->exp_connect_flags |= OBD_CONNECT_LIGHTWEIGHT;
+ *exp_connect_flags_ptr(exp) |= OBD_CONNECT_LIGHTWEIGHT;
spin_unlock(&exp->exp_lock);
}
data->ocd_version = LUSTRE_VERSION_CODE;
+ exp->exp_connect_data = *data;
exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) {
return -EBADE;
}
+ if (data->ocd_connect_flags & OBD_CONNECT_PINGLESS) {
+ if (suppress_pings) {
+ spin_lock(&exp->exp_obd->obd_dev_lock);
+ list_del_init(&exp->exp_obd_chain_timed);
+ spin_unlock(&exp->exp_obd->obd_dev_lock);
+ } else {
+ data->ocd_connect_flags &= ~OBD_CONNECT_PINGLESS;
+ }
+ }
+
return 0;
}
* XXX: probably not very appropriate method is used now
* at some point we should find a better one
*/
- if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state)) {
+ if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) &&
+ !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
rc = obd_health_check(env, mdt->mdt_child_exp->exp_obd);
if (rc)
RETURN(-EAGAIN);
if (!fid_is_sane(&fp->gf_fid))
RETURN(-EINVAL);
- if (!fid_is_norm(&fp->gf_fid) && !fid_is_igif(&fp->gf_fid)) {
+ if (!fid_is_client_mdt_visible(&fp->gf_fid)) {
CWARN("%s: "DFID" is invalid, sequence should be "
">= "LPX64"\n", obd->obd_name,
PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
RETURN(-EINVAL);
}
- rc = lu_object_exists(&obj->mot_obj.mo_lu);
- if (rc <= 0) {
- if (rc == -1)
- rc = -EREMOTE;
- else
- rc = -ENOENT;
+ if (mdt_object_remote(obj))
+ rc = -EREMOTE;
+ else if (!mdt_object_exists(obj))
+ rc = -ENOENT;
+
+ if (rc < 0) {
mdt_object_put(env, obj);
CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
PFID(&fp->gf_fid), rc);
if (IS_ERR(obj))
RETURN(PTR_ERR(obj));
- rc = mdt_object_exists(obj);
- if (rc < 0) {
- rc = -EREMOTE;
- /**
- * before calling version get the correct MDS should be
- * fid, this is error to find remote object here
- */
- CERROR("nonlocal object "DFID"\n", PFID(fid));
- } else if (rc == 0) {
- *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
- rc = -ENOENT;
- } else {
- version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
- *(__u64 *)data->ioc_inlbuf2 = version;
- rc = 0;
- }
- mdt_object_unlock_put(mti, obj, lh, 1);
- RETURN(rc);
+ if (mdt_object_remote(obj)) {
+ rc = -EREMOTE;
+ /**
+ * before calling version get the correct MDS should be
+ * fid, this is error to find remote object here
+ */
+ CERROR("nonlocal object "DFID"\n", PFID(fid));
+ } else if (!mdt_object_exists(obj)) {
+ *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
+ rc = -ENOENT;
+ } else {
+ version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
+ *(__u64 *)data->ioc_inlbuf2 = version;
+ rc = 0;
+ }
+ mdt_object_unlock_put(mti, obj, lh, 1);
+ RETURN(rc);
}
/* ioctls on obd dev */
return rc;
}
-/**
- * Send a copytool req to a client
- * Note this sends a request RPC from a server (MDT) to a client (MDC),
- * backwards of normal comms.
- */
-int mdt_hsm_copytool_send(struct obd_export *exp)
-{
- struct kuc_hdr *lh;
- struct hsm_action_list *hal;
- struct hsm_action_item *hai;
- int rc, len;
- ENTRY;
-
- CWARN("%s: writing to mdc at %s\n", exp->exp_obd->obd_name,
- libcfs_nid2str(exp->exp_connection->c_peer.nid));
-
- len = sizeof(*lh) + sizeof(*hal) + MTI_NAME_MAXLEN +
- /* for mockup below */ 2 * cfs_size_round(sizeof(*hai));
- OBD_ALLOC(lh, len);
- if (lh == NULL)
- RETURN(-ENOMEM);
-
- lh->kuc_magic = KUC_MAGIC;
- lh->kuc_transport = KUC_TRANSPORT_HSM;
- lh->kuc_msgtype = HMT_ACTION_LIST;
- lh->kuc_msglen = len;
-
- hal = (struct hsm_action_list *)(lh + 1);
- hal->hal_version = HAL_VERSION;
- hal->hal_archive_num = 1;
- obd_uuid2fsname(hal->hal_fsname, exp->exp_obd->obd_name,
- MTI_NAME_MAXLEN);
-
- /* mock up an action list */
- hal->hal_count = 2;
- hai = hai_zero(hal);
- hai->hai_action = HSMA_ARCHIVE;
- hai->hai_fid.f_oid = 0xA00A;
- hai->hai_len = sizeof(*hai);
- hai = hai_next(hai);
- hai->hai_action = HSMA_RESTORE;
- hai->hai_fid.f_oid = 0xB00B;
- hai->hai_len = sizeof(*hai);
-
- /* Uses the ldlm reverse import; this rpc will be seen by
- the ldlm_callback_handler */
- rc = do_set_info_async(exp->exp_imp_reverse,
- LDLM_SET_INFO, LUSTRE_OBD_VERSION,
- sizeof(KEY_HSM_COPYTOOL_SEND),
- KEY_HSM_COPYTOOL_SEND,
- len, lh, NULL);
-
- OBD_FREE(lh, len);
-
- RETURN(rc);
-}
-
static struct obd_ops mdt_obd_device_ops = {
.o_owner = THIS_MODULE,
.o_set_info_async = mdt_obd_set_info_async,