#include <lustre_acl.h>
#include <lustre_param.h>
#include <lustre_quota.h>
+#include <lustre_linkea.h>
mdl_mode_t mdt_mdl_lock_modes[] = {
[LCK_MINMODE] = MDL_MINMODE,
static struct mdt_device *mdt_dev(struct lu_device *d);
static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
-static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
- struct getinfo_fid2path *fp);
static const struct lu_object_operations mdt_obj_ops;
int mdt_getstatus(struct mdt_thread_info *info)
{
- struct mdt_device *mdt = info->mti_mdt;
- struct md_device *next = mdt->mdt_child;
- struct mdt_body *repbody;
- int rc;
-
- ENTRY;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_body *repbody;
+ int rc;
+ ENTRY;
rc = mdt_check_ucred(info);
if (rc)
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
RETURN(err_serious(-ENOMEM));
- repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
- rc = next->md_ops->mdo_root_get(info->mti_env, next, &repbody->fid1);
- if (rc != 0)
- RETURN(rc);
-
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+ repbody->fid1 = mdt->mdt_md_root_fid;
repbody->valid |= OBD_MD_FLID;
if (mdt->mdt_opts.mo_mds_capa &&
switch (flag) {
case LDLM_CB_BLOCKING:
ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
+ rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
if (rc < 0) {
CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
RETURN(rc);
info->mti_mos = NULL;
memset(&info->mti_attr, 0, sizeof(info->mti_attr));
- info->mti_body = NULL;
+ info->mti_big_buf = LU_BUF_NULL;
+ info->mti_body = NULL;
info->mti_object = NULL;
info->mti_dlm_req = NULL;
info->mti_has_trans = 0;
static void mdt_thread_info_fini(struct mdt_thread_info *info)
{
- int i;
+ int i;
- req_capsule_fini(info->mti_pill);
- if (info->mti_object != NULL) {
- mdt_object_put(info->mti_env, info->mti_object);
- info->mti_object = NULL;
- }
- for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
- mdt_lock_handle_fini(&info->mti_lh[i]);
- info->mti_env = NULL;
+ req_capsule_fini(info->mti_pill);
+ if (info->mti_object != NULL) {
+ mdt_object_put(info->mti_env, info->mti_object);
+ info->mti_object = NULL;
+ }
+
+ for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
+ mdt_lock_handle_fini(&info->mti_lh[i]);
+ info->mti_env = NULL;
+
+ if (unlikely(info->mti_big_buf.lb_buf != NULL))
+ lu_buf_free(&info->mti_big_buf);
}
static int mdt_filter_recovery_request(struct ptlrpc_request *req,
case MDS_SYNC: /* used in unmounting */
case OBD_PING:
case MDS_REINT:
+ case UPDATE_OBJ:
case SEQ_QUERY:
case FLD_QUERY:
case LDLM_ENQUEUE:
if (rc != 0)
mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
- /* Cross-ref case, the lock should be returned to the client */
- if (rc == -EREMOTE) {
- LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
- rep->lock_policy_res2 = 0;
- rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
- RETURN(rc);
- }
- rep->lock_policy_res2 = clear_serious(rc);
+ /* the open lock or the lock for cross-ref object should be
+ * returned to the client */
+ if (rc == -EREMOTE || mdt_get_disposition(rep, DISP_OPEN_LOCK)) {
+ LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
+ rep->lock_policy_res2 = 0;
+ rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
+ RETURN(rc);
+ }
+
+ rep->lock_policy_res2 = clear_serious(rc);
if (rep->lock_policy_res2 == -ENOENT &&
mdt_get_disposition(rep, DISP_LOOKUP_NEG))
if (rc != 0)
CWARN("Fail to auto trigger paused LFSCK.\n");
- rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
- &mdt->mdt_md_root_fid);
- if (rc)
- RETURN(rc);
-
+ if (mdt->mdt_seq_site.ss_node_id == 0) {
+ rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
+ &mdt->mdt_md_root_fid);
+ if (rc)
+ RETURN(rc);
+ }
LASSERT(!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state));
target_recovery_init(&mdt->mdt_lut, mdt_recovery_handle);
set_bit(MDT_FL_CFGLOG, &mdt->mdt_state);
RETURN(0);
}
-static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key,
- void *val, int vallen)
+/** The maximum depth that fid2path() will search.
+ * This is limited only because we want to store the fids for
+ * historical path lookup purposes.
+ */
+#define MAX_PATH_DEPTH 100
+
+/** mdt_path() lookup structure. */
+struct path_lookup_info {
+ __u64 pli_recno; /**< history point */
+ __u64 pli_currec; /**< current record */
+ struct lu_fid pli_fid;
+ struct lu_fid pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
+ struct mdt_object *pli_mdt_obj;
+ char *pli_path; /**< full path */
+ int pli_pathlen;
+ int pli_linkno; /**< which hardlink to follow */
+ int pli_fidcount; /**< number of \a pli_fids */
+};
+
+static int mdt_links_read(struct mdt_thread_info *info,
+ struct mdt_object *mdt_obj, struct linkea_data *ldata)
{
- struct mdt_device *mdt = mdt_dev(info->mti_exp->exp_obd->obd_lu_dev);
- struct getinfo_fid2path *fpout, *fpin;
- int rc = 0;
+ int rc;
+
+ LASSERT(ldata->ld_buf->lb_buf != NULL);
+
+ if (!mdt_object_exists(mdt_obj))
+ return -ENODATA;
+
+ rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
+ ldata->ld_buf, XATTR_NAME_LINK);
+ if (rc == -ERANGE) {
+ /* Buf was too small, figure out what we need. */
+ lu_buf_free(ldata->ld_buf);
+ rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
+ ldata->ld_buf, XATTR_NAME_LINK);
+ if (rc < 0)
+ return rc;
+ ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
+ if (ldata->ld_buf->lb_buf == NULL)
+ return -ENOMEM;
+ rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
+ ldata->ld_buf, XATTR_NAME_LINK);
+ }
+ if (rc < 0)
+ return rc;
- fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
- fpout = val;
+ linkea_init(ldata);
- if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
- lustre_swab_fid2path(fpin);
+ return 0;
+}
- memcpy(fpout, fpin, sizeof(*fpin));
- if (fpout->gf_pathlen != vallen - sizeof(*fpin))
- RETURN(-EINVAL);
+static int mdt_path_current(struct mdt_thread_info *info,
+ struct path_lookup_info *pli)
+{
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_object *mdt_obj;
+ struct link_ea_header *leh;
+ struct link_ea_entry *lee;
+ struct lu_name *tmpname = &info->mti_name;
+ struct lu_fid *tmpfid = &info->mti_tmp_fid1;
+ struct lu_buf *buf = &info->mti_big_buf;
+ char *ptr;
+ int reclen;
+ struct linkea_data ldata = { 0 };
+ int rc;
+ ENTRY;
- rc = mdt_fid2path(info->mti_env, mdt, fpout);
- RETURN(rc);
+ /* temp buffer for path element, the buffer will be finally freed
+ * in mdt_thread_info_fini */
+ buf = lu_buf_check_and_alloc(buf, PATH_MAX);
+ if (buf->lb_buf == NULL)
+ RETURN(-ENOMEM);
+
+ ldata.ld_buf = buf;
+ ptr = pli->pli_path + pli->pli_pathlen - 1;
+ *ptr = 0;
+ --ptr;
+ pli->pli_fidcount = 0;
+ pli->pli_fids[0] = *(struct lu_fid *)mdt_object_fid(pli->pli_mdt_obj);
+
+ /* root FID only exists on MDT0, and fid2path should also ends at MDT0,
+ * so checking root_fid can only happen on MDT0. */
+ while (!lu_fid_eq(&mdt->mdt_md_root_fid,
+ &pli->pli_fids[pli->pli_fidcount])) {
+ mdt_obj = mdt_object_find(info->mti_env, mdt,
+ &pli->pli_fids[pli->pli_fidcount]);
+ if (IS_ERR(mdt_obj))
+ GOTO(out, rc = PTR_ERR(mdt_obj));
+ if (mdt_object_remote(mdt_obj)) {
+ mdt_object_put(info->mti_env, mdt_obj);
+ GOTO(remote_out, rc = -EREMOTE);
+ }
+ if (!mdt_object_exists(mdt_obj)) {
+ mdt_object_put(info->mti_env, mdt_obj);
+ GOTO(out, rc = -ENOENT);
+ }
+
+ rc = mdt_links_read(info, mdt_obj, &ldata);
+ mdt_object_put(info->mti_env, mdt_obj);
+ if (rc != 0)
+ GOTO(out, rc = PTR_ERR(buf));
+
+ leh = buf->lb_buf;
+ lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
+ linkea_entry_unpack(lee, &reclen, tmpname, tmpfid);
+ /* If set, use link #linkno for path lookup, otherwise use
+ link #0. Only do this for the final path element. */
+ if ((pli->pli_fidcount == 0) &&
+ (pli->pli_linkno < leh->leh_reccount)) {
+ int count;
+ for (count = 0; count < pli->pli_linkno; count++) {
+ lee = (struct link_ea_entry *)
+ ((char *)lee + reclen);
+ linkea_entry_unpack(lee, &reclen, tmpname,
+ tmpfid);
+ }
+ if (pli->pli_linkno < leh->leh_reccount - 1)
+ /* indicate to user there are more links */
+ pli->pli_linkno++;
+ }
+
+ /* Pack the name in the end of the buffer */
+ ptr -= tmpname->ln_namelen;
+ if (ptr - 1 <= pli->pli_path)
+ GOTO(out, rc = -EOVERFLOW);
+ strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
+ *(--ptr) = '/';
+
+ /* Store the parent fid for historic lookup */
+ if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
+ GOTO(out, rc = -EOVERFLOW);
+ pli->pli_fids[pli->pli_fidcount] = *tmpfid;
+ }
+
+remote_out:
+ ptr++; /* skip leading / */
+ memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
+
+ EXIT;
+out:
+ return rc;
}
-static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
- struct getinfo_fid2path *fp)
+/* Returns the full path to this fid, as of changelog record recno. */
+static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj,
+ char *path, int pathlen, __u64 *recno, int *linkno,
+ struct lu_fid *fid)
{
- struct mdt_object *obj;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct path_lookup_info *pli;
+ int tries = 3;
+ int rc = -EAGAIN;
+ ENTRY;
+
+ if (pathlen < 3)
+ RETURN(-EOVERFLOW);
+
+ if (lu_fid_eq(&mdt->mdt_md_root_fid, mdt_object_fid(obj))) {
+ path[0] = '\0';
+ RETURN(0);
+ }
+
+ OBD_ALLOC_PTR(pli);
+ if (pli == NULL)
+ RETURN(-ENOMEM);
+
+ pli->pli_mdt_obj = obj;
+ pli->pli_recno = *recno;
+ pli->pli_path = path;
+ pli->pli_pathlen = pathlen;
+ pli->pli_linkno = *linkno;
+
+ /* Retry multiple times in case file is being moved */
+ while (tries-- && rc == -EAGAIN)
+ rc = mdt_path_current(info, pli);
+
+ /* return the last resolved fids to the client, so the client will
+ * build the left path on another MDT for remote object */
+ *fid = pli->pli_fids[pli->pli_fidcount];
+
+ *recno = pli->pli_currec;
+ /* Return next link index to caller */
+ *linkno = pli->pli_linkno;
+
+ OBD_FREE_PTR(pli);
+
+ RETURN(rc);
+}
+
+static int mdt_fid2path(struct mdt_thread_info *info,
+ struct getinfo_fid2path *fp)
+{
+ struct mdt_device *mdt = info->mti_mdt;
struct obd_device *obd = mdt2obd_dev(mdt);
+ struct mdt_object *obj;
int rc;
ENTRY;
RETURN(-EINVAL);
}
- obj = mdt_object_find(env, mdt, &fp->gf_fid);
+ obj = mdt_object_find(info->mti_env, mdt, &fp->gf_fid);
if (obj == NULL || IS_ERR(obj)) {
CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid),
PTR_ERR(obj));
rc = -EREMOTE;
else if (!mdt_object_exists(obj))
rc = -ENOENT;
+ else
+ rc = 0;
if (rc < 0) {
- mdt_object_put(env, obj);
+ mdt_object_put(info->mti_env, obj);
CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
PFID(&fp->gf_fid), rc);
RETURN(rc);
}
- rc = mo_path(env, md_object_next(&obj->mot_obj), fp->gf_path,
- fp->gf_pathlen, &fp->gf_recno, &fp->gf_linkno);
- mdt_object_put(env, obj);
+ rc = mdt_path(info, obj, fp->gf_path, fp->gf_pathlen, &fp->gf_recno,
+ &fp->gf_linkno, &fp->gf_fid);
+
+ CDEBUG(D_INFO, "fid "DFID", path %s recno "LPX64" linkno %u\n",
+ PFID(&fp->gf_fid), fp->gf_path, fp->gf_recno, fp->gf_linkno);
+
+ mdt_object_put(info->mti_env, obj);
+
+ RETURN(rc);
+}
+
+static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key,
+ void *val, int vallen)
+{
+ struct getinfo_fid2path *fpout, *fpin;
+ int rc = 0;
+
+ fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
+ fpout = val;
+
+ if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
+ lustre_swab_fid2path(fpin);
+
+ memcpy(fpout, fpin, sizeof(*fpin));
+ if (fpout->gf_pathlen != vallen - sizeof(*fpin))
+ RETURN(-EINVAL);
+ rc = mdt_fid2path(info, fpout);
RETURN(rc);
}