struct lu_attr *la = &ma->ma_attr;
ENTRY;
- if (exp_connect_flags(exp) & OBD_CONNECT_LAYOUTLOCK)
+ if (exp_connect_layout(exp))
/* the client can deal with 16-bit lmm_stripe_count */
RETURN_EXIT;
if (mdt_object_remote(o)) {
/* This object is located on remote node.*/
+ /* Return -EIO for old client */
+ if (!mdt_is_dne_client(req->rq_export))
+ GOTO(out, rc = -EIO);
+
repbody->fid1 = *mdt_object_fid(o);
repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
GOTO(out, rc = 0);
RETURN(rc);
}
+int mdt_swap_layouts(struct mdt_thread_info *info)
+{
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct obd_export *exp = req->rq_export;
+ struct mdt_object *o1, *o2, *o;
+ struct mdt_lock_handle *lh1, *lh2;
+ struct mdc_swap_layouts *msl;
+ int rc;
+ ENTRY;
+
+ /* client does not support layout lock, so layout swaping
+ * is disabled.
+ * FIXME: there is a problem for old clients which don't support
+ * layout lock yet. If those clients have already opened the file
+ * they won't be notified at all so that old layout may still be
+ * used to do IO. This can be fixed after file release is landed by
+ * doing exclusive open and taking full EX ibits lock. - Jinshan */
+ if (!exp_connect_layout(exp))
+ RETURN(-EOPNOTSUPP);
+
+ if (req_capsule_get_size(info->mti_pill, &RMF_CAPA1, RCL_CLIENT))
+ mdt_set_capainfo(info, 0, &info->mti_body->fid1,
+ req_capsule_client_get(info->mti_pill,
+ &RMF_CAPA1));
+
+ if (req_capsule_get_size(info->mti_pill, &RMF_CAPA2, RCL_CLIENT))
+ mdt_set_capainfo(info, 1, &info->mti_body->fid2,
+ req_capsule_client_get(info->mti_pill,
+ &RMF_CAPA2));
+
+ o1 = info->mti_object;
+ o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
+ &info->mti_body->fid2);
+ if (IS_ERR(o))
+ GOTO(out, rc = PTR_ERR(o));
+
+ if (mdt_object_exists(o) < 0) /* remote object */
+ GOTO(put, rc = -ENOENT);
+
+ rc = lu_fid_cmp(&info->mti_body->fid1, &info->mti_body->fid2);
+ if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
+ GOTO(put, rc);
+
+ if (rc < 0)
+ swap(o1, o2);
+
+ /* permission check. Make sure the calling process having permission
+ * to write both files. */
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
+ MAY_WRITE);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
+ MAY_WRITE);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
+ LASSERT(msl != NULL);
+
+ lh1 = &info->mti_lh[MDT_LH_NEW];
+ mdt_lock_reg_init(lh1, LCK_EX);
+ lh2 = &info->mti_lh[MDT_LH_OLD];
+ mdt_lock_reg_init(lh2, LCK_EX);
+
+ rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+ if (rc < 0)
+ GOTO(put, rc);
+
+ rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT,
+ MDT_LOCAL_LOCK);
+ if (rc < 0)
+ GOTO(unlock1, rc);
+
+ rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
+ mdt_object_child(o2), msl->msl_flags);
+ GOTO(unlock2, rc);
+unlock2:
+ mdt_object_unlock(info, o2, lh2, rc);
+unlock1:
+ mdt_object_unlock(info, o1, lh1, rc);
+put:
+ mdt_object_put(info->mti_env, o);
+out:
+ RETURN(rc);
+}
+
static int mdt_raw_lookup(struct mdt_thread_info *info,
struct mdt_object *parent,
const struct lu_name *lname,
int rc;
ENTRY;
- desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
- MDS_BULK_PORTAL);
- if (desc == NULL)
- RETURN(-ENOMEM);
+ desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, 1, BULK_PUT_SOURCE,
+ MDS_BULK_PORTAL);
+ if (desc == NULL)
+ RETURN(-ENOMEM);
if (!(exp_connect_flags(exp) & OBD_CONNECT_BRW_SIZE))
/* old client requires reply size in it's PAGE_SIZE,
if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH)
rdpg->rp_attrs |= LUDA_64BITHASH;
rdpg->rp_count = min_t(unsigned int, reqbody->nlink,
- exp_brw_size(info->mti_exp));
+ exp_max_brw_size(info->mti_exp));
rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1) >>
CFS_PAGE_SHIFT;
OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
if (req_ii->ii_count <= 0)
GOTO(out, rc = -EFAULT);
rdpg->rp_count = min_t(unsigned int, req_ii->ii_count << LU_PAGE_SHIFT,
- exp_brw_size(info->mti_exp));
+ exp_max_brw_size(info->mti_exp));
rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT;
/* allocate pages to store the containers */
case MDS_QUOTACHECK:
case MDS_QUOTACTL:
case UPDATE_OBJ:
+ case MDS_SWAP_LAYOUTS:
case QUOTA_DQACQ:
case QUOTA_DQREL:
case SEQ_QUERY:
m->mdt_nosquash_strlen = 0;
}
+ next->md_ops->mdo_iocontrol(env, next, OBD_IOC_PAUSE_LFSCK,
+ 0, NULL);
mdt_seq_fini(env, m);
mdt_fld_fini(env, m);
sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
if (rc)
RETURN(rc);
+ rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child,
+ OBD_IOC_START_LFSCK,
+ 0, NULL);
+ if (rc != 0)
+ CWARN("Fail to auto trigger paused LFSCK.\n");
+
rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
&mdt->mdt_md_root_fid);
if (rc)
RETURN(-EINVAL);
}
- rc = lu_object_exists(&obj->mot_obj.mo_lu);
- if (rc <= 0) {
- if (rc == -1)
- rc = -EREMOTE;
- else
- rc = -ENOENT;
+ if (mdt_object_remote(obj))
+ rc = -EREMOTE;
+ else if (!mdt_object_exists(obj))
+ rc = -ENOENT;
+
+ if (rc < 0) {
mdt_object_put(env, obj);
CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
PFID(&fp->gf_fid), rc);