if (mdt_object_remote(o)) {
/* This object is located on remote node.*/
- /* Return -EIO for old client */
+ /* Return -ENOTSUPP for old client */
if (!mdt_is_dne_client(req->rq_export))
- GOTO(out, rc = -EIO);
+ GOTO(out, rc = -ENOTSUPP);
repbody->fid1 = *mdt_object_fid(o);
repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->valid);
}
if (ma->ma_valid & MA_LMV) {
+ /* Return -ENOTSUPP for old client */
+ if (!mdt_is_striped_client(req->rq_export))
+ RETURN(-ENOTSUPP);
+
LASSERT(S_ISDIR(la->la_mode));
mdt_dump_lmv(D_INFO, ma->ma_lmv);
repbody->eadatasize = ma->ma_lmv_size;
repbody->valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
}
if (ma->ma_valid & MA_LMV_DEF) {
+ /* Return -ENOTSUPP for old client */
+ if (!mdt_is_striped_client(req->rq_export))
+ RETURN(-ENOTSUPP);
LASSERT(S_ISDIR(la->la_mode));
repbody->eadatasize = ma->ma_lmv_size;
repbody->valid |= (OBD_MD_FLDIREA|OBD_MD_DEFAULT_MEA);
}
+/**
+ * mdt_remote_permission: Check whether the remote operation is permitted,
+ *
+ * Before we implement async cross-MDT updates (DNE phase 2). There are a few
+ * limitations here:
+ *
+ * 1.Only sysadmin can create remote directory and striped directory and
+ * migrate directory now, unless
+ * lctl set_param mdt.*.enable_remote_dir_gid=allow_gid.
+ * 2.Remote directory can only be created on MDT0, unless
+ * lctl set_param mdt.*.enable_remote_dir = 1
+ * 3.Only new clients can access remote dir( >= 2.4) and striped dir(>= 2.6),
+ * old client will return -ENOTSUPP.
+ *
+ * XXX these check are only needed for remote synchronization, once async
+ * update is supported, these check will be removed.
+ *
+ * param[in]info: execution environment.
+ * param[in]parent: the directory of this operation.
+ * param[in]child: the child of this operation.
+ *
+ * retval = 0 remote operation is allowed.
+ * < 0 remote operation is denied.
+ */
+static int mdt_remote_permission(struct mdt_thread_info *info,
+ struct mdt_object *parent,
+ struct mdt_object *child)
+{
+ struct mdt_device *mdt = info->mti_mdt;
+ struct lu_ucred *uc = mdt_ucred(info);
+ struct md_op_spec *spec = &info->mti_spec;
+ struct lu_attr *attr = &info->mti_attr.ma_attr;
+ struct obd_export *exp = mdt_info_req(info)->rq_export;
+
+ /* Only check create remote directory, striped directory and
+ * migration */
+ if (mdt_object_remote(parent) == 0 && mdt_object_remote(child) == 0 &&
+ !(S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL &&
+ spec->u.sp_ea.eadatalen != 0) &&
+ info->mti_rr.rr_opcode != REINT_MIGRATE)
+ return 0;
+
+ if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
+ if (uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
+ mdt->mdt_enable_remote_dir_gid != -1)
+ return -EPERM;
+ }
+
+ if (mdt->mdt_enable_remote_dir == 0) {
+ struct seq_server_site *ss = mdt_seq_site(mdt);
+ struct lu_seq_range range = { 0 };
+ int rc;
+
+ fld_range_set_type(&range, LU_SEQ_RANGE_MDT);
+ rc = fld_server_lookup(info->mti_env, ss->ss_server_fld,
+ fid_seq(mdt_object_fid(parent)), &range);
+ if (rc != 0)
+ return rc;
+
+ if (range.lsr_index != 0)
+ return -EPERM;
+ }
+
+ if (!mdt_is_dne_client(exp))
+ return -ENOTSUPP;
+
+ if (S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL &&
+ spec->u.sp_ea.eadatalen != 0 && !mdt_is_striped_client(exp))
+ return -ENOTSUPP;
+
+ return 0;
+}
+
/*
* VBR: we save three versions in reply:
* 0 - parent. Check that parent version is the same during replay.
if (likely(!IS_ERR(child))) {
struct md_object *next = mdt_object_child(parent);
- if (mdt_object_remote(child)) {
- struct seq_server_site *ss;
- struct lu_ucred *uc = mdt_ucred(info);
-
- if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
- if (uc->uc_gid !=
- mdt->mdt_enable_remote_dir_gid &&
- mdt->mdt_enable_remote_dir_gid != -1) {
- CERROR("%s: Creating remote dir is only"
- " permitted for administrator or"
- " set mdt_enable_remote_dir_gid:"
- " rc = %d\n",
- mdt_obd_name(mdt), -EPERM);
- GOTO(out_put_child, rc = -EPERM);
- }
- }
-
- ss = mdt_seq_site(mdt);
- if (ss->ss_node_id != 0 &&
- mdt->mdt_enable_remote_dir == 0) {
- CERROR("%s: remote dir is only permitted on"
- " MDT0 or set_param"
- " mdt.*.enable_remote_dir=1\n",
- mdt_obd_name(mdt));
- GOTO(out_put_child, rc = -EPERM);
- }
- if (!mdt_is_dne_client(mdt_info_req(info)->rq_export)) {
- /* Return -EIO for old client */
- GOTO(out_put_child, rc = -EIO);
- }
+ rc = mdt_remote_permission(info, parent, child);
+ if (rc != 0)
+ GOTO(out_put_child, rc);
- }
- ma->ma_need = MA_INODE;
+ ma->ma_need = MA_INODE;
ma->ma_valid = 0;
/* capa for cross-ref will be stored here */
ma->ma_capa = req_capsule_server_get(info->mti_pill,
PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
if (!mdt_is_dne_client(req->rq_export))
- /* Return -EIO for old client */
- GOTO(put_child, rc = -EIO);
+ /* Return -ENOTSUPP for old client */
+ GOTO(put_child, rc = -ENOTSUPP);
if (info->mti_spec.sp_rm_entry) {
struct lu_ucred *uc = mdt_ucred(info);
CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
PNAME(&rr->rr_name), PFID(rr->rr_fid2));
+
/* 1: lock the source dir. */
msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
if (IS_ERR(msrcdir)) {
GOTO(out_put_child, rc = -EPERM);
}
+ rc = mdt_remote_permission(info, msrcdir, mold);
+ if (rc != 0)
+ GOTO(out_put_child, rc);
+
/* 3: iterate the linkea of the object and lock all of the objects */
CFS_INIT_LIST_HEAD(&lock_list);
rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list);