#include <lustre_fid.h>
#include "lmv_internal.h"
-/* object cache. */
-cfs_mem_cache_t *lmv_object_cache;
-cfs_atomic_t lmv_object_count = CFS_ATOMIC_INIT(0);
-
static void lmv_activate_target(struct lmv_obd *lmv,
struct lmv_tgt_desc *tgt,
int activate)
RETURN(rc);
}
+#if 0
static int lmv_all_chars_policy(int count, const char *name,
int len)
{
CERROR("Unsupported placement policy %x\n", placement);
return -EINVAL;
}
+#endif
/**
* This is _inode_ placement policy function (not name).
struct md_op_data *op_data,
mdsno_t *mds)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj;
- int rc;
- ENTRY;
-
- LASSERT(mds != NULL);
-
- if (lmv->desc.ld_tgt_count == 1) {
- *mds = 0;
- RETURN(0);
- }
-
- /*
- * Allocate new fid on target according to operation type and parent
- * home mds.
- */
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj != NULL || op_data->op_name == NULL ||
- op_data->op_opc != LUSTRE_OPC_MKDIR) {
- /*
- * Allocate fid for non-dir or for null name or for case parent
- * dir is split.
- */
- if (obj) {
- lmv_object_put(obj);
-
- /*
- * If we have this flag turned on, and we see that
- * parent dir is split, this means, that caller did not
- * notice split yet. This is race and we would like to
- * let caller know that.
- */
- if (op_data->op_bias & MDS_CHECK_SPLIT)
- RETURN(-ERESTART);
- }
-
- /*
- * Allocate new fid on same mds where parent fid is located and
- * where operation will be sent. In case of split dir, ->op_fid1
- * and ->op_mds here will contain fid and mds of slave directory
- * object (assigned by caller).
- */
- *mds = op_data->op_mds;
- rc = 0;
- } else {
- /*
- * Parent directory is not split and we want to create a
- * directory in it. Let's calculate where to place it according
- * to operation data @op_data.
- */
- *mds = lmv_choose_mds(lmv, op_data, lmv->lmv_placement);
- rc = 0;
- }
+ LASSERT(mds != NULL);
- if (rc) {
- CERROR("Can't choose MDS, err = %d\n", rc);
- } else {
- LASSERT(*mds < lmv->desc.ld_tgt_count);
- }
+ /* Allocate new fid on target according to to different
+ * QOS policy. In DNE phase I, llite should always tell
+ * which MDT where the dir will be located */
+ *mds = op_data->op_mds;
- RETURN(rc);
+ RETURN(0);
}
int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
spin_lock_init(&lmv->lmv_lock);
mutex_init(&lmv->init_mutex);
- rc = lmv_object_setup(obd);
- if (rc) {
- CERROR("Can't setup LMV object manager, error %d.\n", rc);
- GOTO(out_free_datas, rc);
- }
-
- lprocfs_lmv_init_vars(&lvars);
+ lprocfs_lmv_init_vars(&lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
#ifdef LPROCFS
{
static int lmv_cleanup(struct obd_device *obd)
{
- struct lmv_obd *lmv = &obd->u.lmv;
- ENTRY;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ ENTRY;
- fld_client_fini(&lmv->lmv_fld);
- lmv_object_cleanup(obd);
- OBD_FREE(lmv->datas, lmv->datas_size);
- OBD_FREE(lmv->tgts, lmv->tgts_size);
+ fld_client_fini(&lmv->lmv_fld);
+ OBD_FREE(lmv->datas, lmv->datas_size);
+ OBD_FREE(lmv->tgts, lmv->tgts_size);
- RETURN(0);
+ RETURN(0);
}
static int lmv_process_config(struct obd_device *obd, obd_count len, void *buf)
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
int rc;
- int i;
ENTRY;
rc = lmv_check_connect(obd);
}
rc = md_getattr(tgt->ltd_exp, op_data, request);
- if (rc)
- RETURN(rc);
-
- obj = lmv_object_find_lock(obd, &op_data->op_fid1);
-
- CDEBUG(D_INODE, "GETATTR for "DFID" %s\n", PFID(&op_data->op_fid1),
- obj ? "(split)" : "");
-
- /*
- * If object is split, then we loop over all the slaves and gather size
- * attribute. In ideal world we would have to gather also mds field from
- * all slaves, as object is spread over the cluster and this is
- * definitely interesting information and it is not good to loss it,
- * but...
- */
- if (obj) {
- struct mdt_body *body;
-
- if (*request == NULL) {
- lmv_object_put(obj);
- RETURN(rc);
- }
-
- body = req_capsule_server_get(&(*request)->rq_pill,
- &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
- for (i = 0; i < obj->lo_objcount; i++) {
- if (lmv->tgts[i].ltd_exp == NULL) {
- CWARN("%s: NULL export for %d\n",
- obd->obd_name, i);
- continue;
- }
-
- /*
- * Skip master object.
- */
- if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid))
- continue;
-
- body->size += obj->lo_stripes[i].ls_size;
- }
-
- lmv_object_put_unlock(obj);
- }
RETURN(rc);
}
RETURN(rc);
}
-/**
- * Called in the case MDS returns -ERESTART on create on open, what means that
- * directory is split and its LMV presentation object has to be updated.
- */
-int lmv_handle_split(struct obd_export *exp, const struct lu_fid *fid)
+struct lmv_tgt_desc
+*lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
+ struct lu_fid *fid)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req = NULL;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- struct lustre_md md;
- struct md_op_data *op_data;
- int mealen;
- int rc;
- __u64 valid;
- ENTRY;
-
- md.mea = NULL;
- mealen = lmv_get_easize(lmv);
-
- valid = OBD_MD_FLEASIZE | OBD_MD_FLDIREA | OBD_MD_MEA;
-
- tgt = lmv_find_target(lmv, fid);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
-
- /*
- * Time to update mea of parent fid.
- */
-
- OBD_ALLOC_PTR(op_data);
- if (op_data == NULL)
- RETURN(-ENOMEM);
-
- op_data->op_fid1 = *fid;
- op_data->op_mode = mealen;
- op_data->op_valid = valid;
-
- rc = md_getattr(tgt->ltd_exp, op_data, &req);
- OBD_FREE_PTR(op_data);
- if (rc) {
- CERROR("md_getattr() failed, error %d\n", rc);
- GOTO(cleanup, rc);
- }
-
- rc = md_get_lustre_md(tgt->ltd_exp, req, NULL, exp, &md);
- if (rc) {
- CERROR("md_get_lustre_md() failed, error %d\n", rc);
- GOTO(cleanup, rc);
- }
-
- if (md.mea == NULL)
- GOTO(cleanup, rc = -ENODATA);
+ struct lmv_tgt_desc *tgt;
- obj = lmv_object_create(exp, fid, md.mea);
- if (IS_ERR(obj))
- rc = PTR_ERR(obj);
- else
- lmv_object_put(obj);
+ tgt = lmv_find_target(lmv, fid);
+ op_data->op_mds = tgt->ltd_idx;
- obd_free_memmd(exp, (void *)&md.mea);
- EXIT;
-cleanup:
- if (req)
- ptlrpc_req_finished(req);
- return rc;
+ return tgt;
}
int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
__u32 gid, cfs_cap_t cap_effective, __u64 rdev,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int rc;
- int loop = 0;
- int sidx;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- if (!lmv->desc.ld_active_tgt_count)
- RETURN(-EIO);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
-
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- op_data->op_name, op_data->op_namelen);
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- op_data->op_bias &= ~MDS_CHECK_SPLIT;
- op_data->op_mds = obj->lo_stripes[sidx].ls_mds;
- tgt = lmv_get_target(lmv, op_data->op_mds);
- lmv_object_put(obj);
- } else {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- op_data->op_bias |= MDS_CHECK_SPLIT;
- op_data->op_mds = tgt->ltd_idx;
- }
+ if (!lmv->desc.ld_active_tgt_count)
+ RETURN(-EIO);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
- if (rc == -ERESTART)
- goto repeat;
- else if (rc)
- RETURN(rc);
+ rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
+ if (rc)
+ RETURN(rc);
- CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
- op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
- op_data->op_mds);
+ CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
+ op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+ op_data->op_mds);
- op_data->op_flags |= MF_MDC_CANCEL_FID1;
- rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
- cap_effective, rdev, request);
- if (rc == 0) {
- if (*request == NULL)
- RETURN(rc);
- CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
- } else if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during create!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ op_data->op_flags |= MF_MDC_CANCEL_FID1;
+ rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
+ cap_effective, rdev, request);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &op_data->op_fid1);
- if (rc == 0) {
- rc = lmv_allocate_slaves(obd, &op_data->op_fid1,
- op_data, &op_data->op_fid2);
- if (rc)
- RETURN(rc);
- goto repeat;
- }
- }
- RETURN(rc);
+ if (rc == 0) {
+ if (*request == NULL)
+ RETURN(rc);
+ CDEBUG(D_INODE, "Created - "DFID"\n", PFID(&op_data->op_fid2));
+ }
+ RETURN(rc);
}
static int lmv_done_writing(struct obd_export *exp,
}
static int
-lmv_enqueue_slaves(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
- struct lookup_intent *it, struct md_op_data *op_data,
- struct lustre_handle *lockh, void *lmm, int lmmsize)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_stripe_md *mea = op_data->op_mea1;
- struct md_op_data *op_data2;
- struct lmv_tgt_desc *tgt;
- int i;
- int rc = 0;
- ENTRY;
-
- OBD_ALLOC_PTR(op_data2);
- if (op_data2 == NULL)
- RETURN(-ENOMEM);
-
- LASSERT(mea != NULL);
- for (i = 0; i < mea->mea_count; i++) {
- memset(op_data2, 0, sizeof(*op_data2));
- op_data2->op_fid1 = mea->mea_ids[i];
- op_data2->op_bias = 0;
-
- tgt = lmv_find_target(lmv, &op_data2->op_fid1);
- if (IS_ERR(tgt))
- GOTO(cleanup, rc = PTR_ERR(tgt));
-
- if (tgt->ltd_exp == NULL)
- continue;
-
- rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data2,
- lockh + i, lmm, lmmsize, NULL, 0);
-
- CDEBUG(D_INODE, "Take lock on slave "DFID" -> %d/%d\n",
- PFID(&mea->mea_ids[i]), rc, it->d.lustre.it_status);
-
- if (rc)
- GOTO(cleanup, rc);
-
- if (it->d.lustre.it_data) {
- struct ptlrpc_request *req;
- req = (struct ptlrpc_request *)it->d.lustre.it_data;
- ptlrpc_req_finished(req);
- }
-
- if (it->d.lustre.it_status)
- GOTO(cleanup, rc = it->d.lustre.it_status);
- }
-
- EXIT;
-cleanup:
- OBD_FREE_PTR(op_data2);
-
- if (rc != 0) {
- /*
- * Drop all taken locks.
- */
- while (--i >= 0) {
- if (lockh[i].cookie)
- ldlm_lock_decref(lockh + i, einfo->ei_mode);
- lockh[i].cookie = 0;
- }
- }
- return rc;
-}
-
-static int
lmv_enqueue_remote(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
struct lookup_intent *it, struct md_op_data *op_data,
struct lustre_handle *lockh, void *lmm, int lmmsize,
struct lustre_handle *lockh, void *lmm, int lmmsize,
struct ptlrpc_request **req, __u64 extra_lock_flags)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int sidx;
- int rc;
- ENTRY;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
- LL_IT2STR(it), PFID(&op_data->op_fid1));
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
- if (op_data->op_mea1 && it && it->it_op == IT_UNLINK) {
- rc = lmv_enqueue_slaves(exp, einfo, it, op_data,
- lockh, lmm, lmmsize);
- RETURN(rc);
- }
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj && op_data->op_namelen) {
- sidx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
- } else {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- }
- if (obj)
- lmv_object_put(obj);
+ CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID"\n",
+ LL_IT2STR(it), PFID(&op_data->op_fid1));
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
- LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
+ CDEBUG(D_INODE, "ENQUEUE '%s' on "DFID" -> mds #%d\n",
+ LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
- rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
- lmm, lmmsize, req, extra_lock_flags);
+ rc = md_enqueue(tgt->ltd_exp, einfo, it, op_data, lockh,
+ lmm, lmmsize, req, extra_lock_flags);
- if (rc == 0 && it && it->it_op == IT_OPEN) {
- rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
- lmm, lmmsize, extra_lock_flags);
- }
- RETURN(rc);
+ if (rc == 0 && it && it->it_op == IT_OPEN) {
+ rc = lmv_enqueue_remote(exp, einfo, it, op_data, lockh,
+ lmm, lmmsize, extra_lock_flags);
+ }
+ RETURN(rc);
}
static int
lmv_getattr_name(struct obd_export *exp,struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct ptlrpc_request *req = NULL;
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lu_fid rid = op_data->op_fid1;
- struct lmv_tgt_desc *tgt;
- struct mdt_body *body;
- struct lmv_object *obj;
- obd_valid valid = op_data->op_valid;
- int rc;
- int loop = 0;
- int sidx;
- ENTRY;
+ struct ptlrpc_request *req = NULL;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ struct mdt_body *body;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
- obj = lmv_object_find(obd, &rid);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- op_data->op_name, op_data->op_namelen);
- rid = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
- op_data->op_mds = obj->lo_stripes[sidx].ls_mds;
- valid &= ~OBD_MD_FLCKSPLIT;
- lmv_object_put(obj);
- } else {
- tgt = lmv_find_target(lmv, &rid);
- valid |= OBD_MD_FLCKSPLIT;
- op_data->op_mds = tgt->ltd_idx;
- }
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" - "DFID" -> mds #%d\n",
- op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
- PFID(&rid), tgt->ltd_idx);
-
- op_data->op_valid = valid;
- op_data->op_fid1 = rid;
- rc = md_getattr_name(tgt->ltd_exp, op_data, request);
- if (rc == 0) {
- body = req_capsule_server_get(&(*request)->rq_pill,
- &RMF_MDT_BODY);
- LASSERT(body != NULL);
-
- if (body->valid & OBD_MD_MDS) {
- rid = body->fid1;
- CDEBUG(D_INODE, "Request attrs for "DFID"\n",
- PFID(&rid));
-
- tgt = lmv_find_target(lmv, &rid);
- if (IS_ERR(tgt)) {
- ptlrpc_req_finished(*request);
- RETURN(PTR_ERR(tgt));
- }
+ CDEBUG(D_INODE, "GETATTR_NAME for %*s on "DFID" -> mds #%d\n",
+ op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
+ tgt->ltd_idx);
- op_data->op_fid1 = rid;
- op_data->op_valid |= OBD_MD_FLCROSSREF;
- op_data->op_namelen = 0;
- op_data->op_name = NULL;
- rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
- ptlrpc_req_finished(*request);
- *request = req;
- }
- } else if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during getattr!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ rc = md_getattr_name(tgt->ltd_exp, op_data, request);
+ if (rc != 0)
+ RETURN(rc);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &rid);
- if (rc == 0)
- goto repeat;
- }
- RETURN(rc);
+ body = req_capsule_server_get(&(*request)->rq_pill,
+ &RMF_MDT_BODY);
+ LASSERT(body != NULL);
+
+ if (body->valid & OBD_MD_MDS) {
+ struct lu_fid rid = body->fid1;
+ CDEBUG(D_INODE, "Request attrs for "DFID"\n",
+ PFID(&rid));
+
+ tgt = lmv_find_target(lmv, &rid);
+ if (IS_ERR(tgt)) {
+ ptlrpc_req_finished(*request);
+ RETURN(PTR_ERR(tgt));
+ }
+
+ op_data->op_fid1 = rid;
+ op_data->op_valid |= OBD_MD_FLCROSSREF;
+ op_data->op_namelen = 0;
+ op_data->op_name = NULL;
+ rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
+ ptlrpc_req_finished(*request);
+ *request = req;
+ }
+
+ RETURN(rc);
}
#define md_op_data_fid(op_data, fl) \
fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
NULL)
-static int lmv_early_cancel_slaves(struct obd_export *exp,
- struct md_op_data *op_data, int op_tgt,
- ldlm_mode_t mode, int bits, int flag)
-{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- ldlm_policy_data_t policy = {{0}};
- struct lu_fid *op_fid;
- struct lu_fid *st_fid;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int rc = 0;
- int i;
- ENTRY;
-
- op_fid = md_op_data_fid(op_data, flag);
- if (!fid_is_sane(op_fid))
- RETURN(0);
-
- obj = lmv_object_find(obd, op_fid);
- if (obj == NULL)
- RETURN(-EALREADY);
-
- policy.l_inodebits.bits = bits;
- for (i = 0; i < obj->lo_objcount; i++) {
- tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
- st_fid = &obj->lo_stripes[i].ls_fid;
- if (op_tgt != tgt->ltd_idx) {
- CDEBUG(D_INODE, "EARLY_CANCEL slave "DFID" -> mds #%d\n",
- PFID(st_fid), tgt->ltd_idx);
- rc = md_cancel_unused(tgt->ltd_exp, st_fid, &policy,
- mode, LCF_ASYNC, NULL);
- if (rc)
- GOTO(out_put_obj, rc);
- } else {
- CDEBUG(D_INODE,
- "EARLY_CANCEL skip operation target %d on "DFID"\n",
- op_tgt, PFID(st_fid));
- /*
- * Do not cancel locks for operation target, they will
- * be handled later in underlaying layer when calling
- * function we run on behalf of.
- */
- *op_fid = *st_fid;
- op_data->op_flags |= flag;
- }
- }
- EXIT;
-out_put_obj:
- lmv_object_put(obj);
- return rc;
-}
-
static int lmv_early_cancel(struct obd_export *exp, struct md_op_data *op_data,
int op_tgt, ldlm_mode_t mode, int bits, int flag)
{
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *tgt;
ldlm_policy_data_t policy = {{0}};
- struct lmv_object *obj;
int rc = 0;
ENTRY;
if (!fid_is_sane(fid))
RETURN(0);
- obj = lmv_object_find(obd, fid);
- if (obj) {
- rc = lmv_early_cancel_slaves(exp, op_data, op_tgt, mode,
- bits, flag);
- lmv_object_put(obj);
- } else {
- tgt = lmv_find_target(lmv, fid);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
-
- if (tgt->ltd_idx != op_tgt) {
- CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
- policy.l_inodebits.bits = bits;
- rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
- mode, LCF_ASYNC, NULL);
- } else {
- CDEBUG(D_INODE,
- "EARLY_CANCEL skip operation target %d on "DFID"\n",
- op_tgt, PFID(fid));
- op_data->op_flags |= flag;
- rc = 0;
- }
+ tgt = lmv_find_target(lmv, fid);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
+
+ if (tgt->ltd_idx != op_tgt) {
+ CDEBUG(D_INODE, "EARLY_CANCEL on "DFID"\n", PFID(fid));
+ policy.l_inodebits.bits = bits;
+ rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
+ mode, LCF_ASYNC, NULL);
+ } else {
+ CDEBUG(D_INODE,
+ "EARLY_CANCEL skip operation target %d on "DFID"\n",
+ op_tgt, PFID(fid));
+ op_data->op_flags |= flag;
+ rc = 0;
+ }
- }
- RETURN(rc);
+ RETURN(rc);
}
/*
static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int rc;
- int loop = 0;
- mdsno_t mds;
- int sidx;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
- LASSERT(op_data->op_namelen != 0);
-
- CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
- PFID(&op_data->op_fid2), op_data->op_namelen,
- op_data->op_name, PFID(&op_data->op_fid1));
-
- obj = lmv_object_find(obd, &op_data->op_fid2);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- op_data->op_name,
- op_data->op_namelen);
- op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
- mds = obj->lo_stripes[sidx].ls_mds;
- lmv_object_put(obj);
- } else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
- if (rc)
- RETURN(rc);
- }
+ LASSERT(op_data->op_namelen != 0);
- CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n",
- mds, PFID(&op_data->op_fid1));
+ CDEBUG(D_INODE, "LINK "DFID":%*s to "DFID"\n",
+ PFID(&op_data->op_fid2), op_data->op_namelen,
+ op_data->op_name, PFID(&op_data->op_fid1));
- op_data->op_fsuid = cfs_curproc_fsuid();
- op_data->op_fsgid = cfs_curproc_fsgid();
- op_data->op_cap = cfs_curproc_cap_pack();
- tgt = lmv_get_target(lmv, mds);
+ op_data->op_fsuid = cfs_curproc_fsuid();
+ op_data->op_fsgid = cfs_curproc_fsgid();
+ op_data->op_cap = cfs_curproc_cap_pack();
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- /*
- * Cancel UPDATE lock on child (fid1).
- */
- op_data->op_flags |= MF_MDC_CANCEL_FID2;
- rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
- MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
- if (rc == 0)
- rc = md_link(tgt->ltd_exp, op_data, request);
- if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during link!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ /*
+ * Cancel UPDATE lock on child (fid1).
+ */
+ op_data->op_flags |= MF_MDC_CANCEL_FID2;
+ rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+ if (rc != 0)
+ RETURN(rc);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &op_data->op_fid2);
- if (rc == 0)
- goto repeat;
- }
+ rc = md_link(tgt->ltd_exp, op_data, request);
- RETURN(rc);
+ RETURN(rc);
}
static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct lmv_tgt_desc *src_tgt;
- int rc;
- int sidx;
- int loop = 0;
- struct lmv_object *obj;
- mdsno_t mds1;
- mdsno_t mds2;
- ENTRY;
+ struct lmv_tgt_desc *tgt_tgt;
+ int rc;
+ ENTRY;
LASSERT(oldlen != 0);
if (rc)
RETURN(rc);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)old, oldlen);
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- mds1 = obj->lo_stripes[sidx].ls_mds;
- CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
- lmv_object_put(obj);
- } else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1);
- if (rc)
- RETURN(rc);
- }
-
- obj = lmv_object_find(obd, &op_data->op_fid2);
- if (obj) {
- /*
- * Directory is already split, so we have to forward request to
- * the right MDS.
- */
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)new, newlen);
-
- mds2 = obj->lo_stripes[sidx].ls_mds;
- op_data->op_fid2 = obj->lo_stripes[sidx].ls_fid;
- CDEBUG(D_INODE, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
- lmv_object_put(obj);
- } else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2);
- if (rc)
- RETURN(rc);
- }
-
- op_data->op_fsuid = cfs_curproc_fsuid();
- op_data->op_fsgid = cfs_curproc_fsgid();
- op_data->op_cap = cfs_curproc_cap_pack();
-
- src_tgt = lmv_get_target(lmv, mds1);
-
- /*
- * LOOKUP lock on src child (fid3) should also be cancelled for
- * src_tgt in mdc_rename.
- */
- op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
-
- /*
- * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
- * own target.
- */
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_UPDATE,
- MF_MDC_CANCEL_FID2);
-
- /*
- * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
- */
- if (rc == 0) {
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_LOOKUP,
- MF_MDC_CANCEL_FID4);
- }
+ op_data->op_fsuid = cfs_curproc_fsuid();
+ op_data->op_fsgid = cfs_curproc_fsgid();
+ op_data->op_cap = cfs_curproc_cap_pack();
+ src_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(src_tgt))
+ RETURN(PTR_ERR(src_tgt));
+
+ tgt_tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
+ if (IS_ERR(tgt_tgt))
+ RETURN(PTR_ERR(tgt_tgt));
+ /*
+ * LOOKUP lock on src child (fid3) should also be cancelled for
+ * src_tgt in mdc_rename.
+ */
+ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+ /*
+ * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
+ * own target.
+ */
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_UPDATE,
+ MF_MDC_CANCEL_FID2);
+
+ /*
+ * Cancel LOOKUP locks on tgt child (fid4) for parent tgt_tgt.
+ */
+ if (rc == 0) {
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_LOOKUP,
+ MF_MDC_CANCEL_FID4);
+ }
- /*
- * Cancel all the locks on tgt child (fid4).
- */
- if (rc == 0)
- rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
- LCK_EX, MDS_INODELOCK_FULL,
- MF_MDC_CANCEL_FID4);
-
- if (rc == 0)
- rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
- new, newlen, request);
-
- if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during rename!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ /*
+ * Cancel all the locks on tgt child (fid4).
+ */
+ if (rc == 0)
+ rc = lmv_early_cancel(exp, op_data, src_tgt->ltd_idx,
+ LCK_EX, MDS_INODELOCK_FULL,
+ MF_MDC_CANCEL_FID4);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &op_data->op_fid1);
- if (rc == 0)
- goto repeat;
- }
- RETURN(rc);
+ if (rc == 0)
+ rc = md_rename(src_tgt->ltd_exp, op_data, old, oldlen,
+ new, newlen, request);
+ RETURN(rc);
}
static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request,
struct md_open_data **mod)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct ptlrpc_request *req;
- struct lmv_tgt_desc *tgt;
- struct lmv_object *obj;
- int rc = 0;
- int i;
- ENTRY;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- obj = lmv_object_find(obd, &op_data->op_fid1);
-
- CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x%s\n",
- PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
- obj ? ", split" : "");
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt;
+ int rc = 0;
+ ENTRY;
- op_data->op_flags |= MF_MDC_CANCEL_FID1;
- if (obj) {
- for (i = 0; i < obj->lo_objcount; i++) {
- op_data->op_fid1 = obj->lo_stripes[i].ls_fid;
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- tgt = lmv_get_target(lmv, obj->lo_stripes[i].ls_mds);
- if (IS_ERR(tgt)) {
- rc = PTR_ERR(tgt);
- break;
- }
+ CDEBUG(D_INODE, "SETATTR for "DFID", valid 0x%x\n",
+ PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
- rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen,
- ea2, ea2len, &req, mod);
-
- if (lu_fid_eq(&obj->lo_fid, &obj->lo_stripes[i].ls_fid)) {
- /*
- * This is master object and this request should
- * be returned back to llite.
- */
- *request = req;
- } else {
- ptlrpc_req_finished(req);
- }
+ op_data->op_flags |= MF_MDC_CANCEL_FID1;
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- if (rc)
- break;
- }
- lmv_object_put(obj);
- } else {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
+ ea2len, request, mod);
- rc = md_setattr(tgt->ltd_exp, op_data, ea, ealen, ea2,
- ea2len, request, mod);
- }
- RETURN(rc);
+ RETURN(rc);
}
static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
RETURN(rc);
}
-/**
- * Main purpose of LMV blocking ast is to remove split directory LMV
- * presentation object (struct lmv_object) attached to the lock being revoked.
- */
-int lmv_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
-{
- struct lustre_handle lockh;
- struct lmv_object *obj;
- int rc;
- ENTRY;
-
- switch (flag) {
- case LDLM_CB_BLOCKING:
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc < 0) {
- CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
- RETURN(rc);
- }
- break;
- case LDLM_CB_CANCELING:
- /*
- * Time to drop cached attrs for split directory object
- */
- obj = lock->l_ast_data;
- if (obj) {
- CDEBUG(D_INODE, "Cancel %s on "LPU64"/"LPU64
- ", master "DFID"\n",
- lock->l_resource->lr_name.name[3] == 1 ?
- "LOOKUP" : "UPDATE",
- lock->l_resource->lr_name.name[0],
- lock->l_resource->lr_name.name[1],
- PFID(&obj->lo_fid));
- lmv_object_put(obj);
- }
- break;
- default:
- LBUG();
- }
- RETURN(0);
-}
-
+#if 0
static void lmv_hash_adjust(__u64 *hash, __u64 hash_adj)
{
__u64 val;
return id ^ (id >> 32);
}
+#endif
static int lmv_readpage(struct obd_export *exp, struct md_op_data *op_data,
struct page **pages, struct ptlrpc_request **request)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj;
- struct lu_fid rid = op_data->op_fid1;
__u64 offset = op_data->op_offset;
- __u64 hash_adj = 0;
- __u32 rank = 0;
- __u64 seg_size = 0;
- __u64 tgt_tmp = 0;
- int tgt_idx = 0;
- int tgt0_idx = 0;
int rc;
- int nr = 0;
int i;
/* number of pages read, in CFS_PAGE_SIZE */
int nrdpgs;
/* number of pages transferred in LU_PAGE_SIZE */
int nlupgs;
- struct lmv_stripe *los;
struct lmv_tgt_desc *tgt;
struct lu_dirpage *dp;
struct lu_dirent *ent;
if (rc)
RETURN(rc);
- CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n", offset, PFID(&rid));
-
- /*
- * This case handle directory lookup in clustered metadata case (i.e.
- * split directory is located on multiple md servers.)
- * each server keeps directory entries for certain range of hashes.
- * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
- * first server will keep records with hashes [ 0 ... MAX_HASH / N - 1],
- * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
- * so on....
- * readdir can simply start reading entries from 0 - N server in
- * order but that will not scale well as all client will request dir in
- * to server in same order.
- * Following algorithm does optimization:
- * Instead of doing readdir in 1, 2, ...., N order, client with a
- * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
- * (every client has rank R)
- * But ll_readdir() expect offset range [0 to MAX_HASH/N) but
- * since client ask dir from MDS{R} client has pages with offsets
- * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
- * on hash values that we get.
- */
- obj = lmv_object_find_lock(obd, &rid);
- if (obj) {
- nr = obj->lo_objcount;
- LASSERT(nr > 0);
- seg_size = MAX_HASH_SIZE;
- do_div(seg_size, nr);
- los = obj->lo_stripes;
- tgt = lmv_get_target(lmv, los[0].ls_mds);
- rank = lmv_node_rank(tgt->ltd_exp, &rid) % nr;
- tgt_tmp = offset;
- do_div(tgt_tmp, seg_size);
- tgt0_idx = do_div(tgt_tmp, nr);
- tgt_idx = (tgt0_idx + rank) % nr;
-
- if (tgt_idx < tgt0_idx)
- /*
- * Wrap around.
- *
- * Last segment has unusual length due to division
- * rounding.
- */
- hash_adj = MAX_HASH_SIZE - seg_size * nr;
- else
- hash_adj = 0;
-
- hash_adj += rank * seg_size;
-
- CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
- LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
- offset, tgt0_idx, offset + hash_adj, tgt_idx);
-
- offset = (offset + hash_adj) & MAX_HASH_SIZE;
- rid = obj->lo_stripes[tgt_idx].ls_fid;
- tgt = lmv_get_target(lmv, los[tgt_idx].ls_mds);
-
- CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
- PFID(&rid), (unsigned long)offset, tgt_idx);
- } else
- tgt = lmv_find_target(lmv, &rid);
-
- if (IS_ERR(tgt))
- GOTO(cleanup, rc = PTR_ERR(tgt));
+ CDEBUG(D_INODE, "READPAGE at "LPX64" from "DFID"\n",
+ offset, PFID(&op_data->op_fid1));
+
+ /*
+ * This case handle directory lookup in clustered metadata case (i.e.
+ * split directory is located on multiple md servers.)
+ * each server keeps directory entries for certain range of hashes.
+ * E.g. we have N server and suppose hash range is 0 to MAX_HASH.
+ * first server will keep records with hashes [ 0 ... MAX_HASH /N - 1],
+ * second one with hashes [MAX_HASH / N ... 2 * MAX_HASH / N] and
+ * so on....
+ * readdir can simply start reading entries from 0 - N server in
+ * order but that will not scale well as all client will request dir in
+ * to server in same order.
+ * Following algorithm does optimization:
+ * Instead of doing readdir in 1, 2, ...., N order, client with a
+ * rank R does readdir in R, R + 1, ..., N, 1, ... R - 1 order.
+ * (every client has rank R)
+ * But ll_readdir() expect offset range [0 to MAX_HASH/N) but
+ * since client ask dir from MDS{R} client has pages with offsets
+ * [R*MAX_HASH/N ... (R + 1)*MAX_HASH/N] there for we do hash_adj
+ * on hash values that we get.
+ if (0) {
+ LASSERT(nr > 0);
+ seg_size = MAX_HASH_SIZE;
+ do_div(seg_size, nr);
+ los = obj->lo_stripes;
+ tgt = lmv_get_target(lmv, los[0].ls_mds);
+ rank = lmv_node_rank(tgt->ltd_exp, fid) % nr;
+ tgt_tmp = offset;
+ do_div(tgt_tmp, seg_size);
+ tgt0_idx = do_div(tgt_tmp, nr);
+ tgt_idx = (tgt0_idx + rank) % nr;
+
+ if (tgt_idx < tgt0_idx)
+ * Wrap around.
+ *
+ * Last segment has unusual length due to division
+ * rounding.
+ hash_adj = MAX_HASH_SIZE - seg_size * nr;
+ else
+ hash_adj = 0;
+
+ hash_adj += rank * seg_size;
+
+ CDEBUG(D_INODE, "Readpage hash adjustment: %x "LPX64" "
+ LPX64"/%x -> "LPX64"/%x\n", rank, hash_adj,
+ offset, tgt0_idx, offset + hash_adj, tgt_idx);
+
+ offset = (offset + hash_adj) & MAX_HASH_SIZE;
+ rid = lsm->mea_oinfo[tgt_idx].lmo_fid;
+ tgt = lmv_get_target(lmv, lsm->mea_oinfo[tgt_idx].lmo_mds);
+
+ CDEBUG(D_INODE, "Forward to "DFID" with offset %lu i %d\n",
+ PFID(&rid), (unsigned long)offset, tgt_idx);
+ }
+ */
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- op_data->op_fid1 = rid;
- rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
- if (rc)
- GOTO(cleanup, rc);
+ rc = md_readpage(tgt->ltd_exp, op_data, pages, request);
+ if (rc != 0)
+ RETURN(rc);
- nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
- >> CFS_PAGE_SHIFT;
- nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
- LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
- LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
+ nrdpgs = ((*request)->rq_bulk->bd_nob_transferred + CFS_PAGE_SIZE - 1)
+ >> CFS_PAGE_SHIFT;
+ nlupgs = (*request)->rq_bulk->bd_nob_transferred >> LU_PAGE_SHIFT;
+ LASSERT(!((*request)->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK));
+ LASSERT(nrdpgs > 0 && nrdpgs <= op_data->op_npages);
- CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
- op_data->op_npages);
+ CDEBUG(D_INODE, "read %d(%d)/%d pages\n", nrdpgs, nlupgs,
+ op_data->op_npages);
- for (i = 0; i < nrdpgs; i++) {
+ for (i = 0; i < nrdpgs; i++) {
#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- struct lu_dirpage *first;
- __u64 hash_end = 0;
- __u32 flags = 0;
+ struct lu_dirpage *first;
+ __u64 hash_end = 0;
+ __u32 flags = 0;
#endif
- struct lu_dirent *tmp = NULL;
-
- dp = cfs_kmap(pages[i]);
- if (obj) {
- lmv_hash_adjust(&dp->ldp_hash_start, hash_adj);
- lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
- LASSERT(le64_to_cpu(dp->ldp_hash_start) <=
- op_data->op_offset);
-
- if ((tgt0_idx != nr - 1) &&
- (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF))
- {
- dp->ldp_hash_end = cpu_to_le32(seg_size *
- (tgt0_idx + 1));
- CDEBUG(D_INODE,
- ""DFID" reset end "LPX64" tgt %d\n",
- PFID(&rid),
- (__u64)le64_to_cpu(dp->ldp_hash_end),
- tgt_idx);
- }
- }
+ struct lu_dirent *tmp = NULL;
- ent = lu_dirent_start(dp);
+ dp = cfs_kmap(pages[i]);
+ ent = lu_dirent_start(dp);
#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- first = dp;
- hash_end = dp->ldp_hash_end;
+ first = dp;
+ hash_end = dp->ldp_hash_end;
repeat:
#endif
- nlupgs--;
- for (tmp = ent; ent != NULL;
- tmp = ent, ent = lu_dirent_next(ent)) {
- if (obj)
- lmv_hash_adjust(&ent->lde_hash, hash_adj);
- }
+ nlupgs--;
+ for (tmp = ent; ent != NULL;
+ tmp = ent, ent = lu_dirent_next(ent));
#if CFS_PAGE_SIZE > LU_PAGE_SIZE
- dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
- if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
- ent = lu_dirent_start(dp);
-
- if (obj) {
- lmv_hash_adjust(&dp->ldp_hash_end, hash_adj);
- if ((tgt0_idx != nr - 1) &&
- (le64_to_cpu(dp->ldp_hash_end) ==
- MDS_DIR_END_OFF)) {
- hash_end = cpu_to_le32(seg_size *
- (tgt0_idx + 1));
- CDEBUG(D_INODE,
- ""DFID" reset end "LPX64" tgt %d\n",
- PFID(&rid),
- (__u64)le64_to_cpu(hash_end),
- tgt_idx);
- }
- }
- hash_end = dp->ldp_hash_end;
- flags = dp->ldp_flags;
-
- if (tmp) {
- /* enlarge the end entry lde_reclen from 0 to
- * first entry of next lu_dirpage, in this way
- * several lu_dirpages can be stored into one
- * client page on client. */
- tmp = ((void *)tmp) +
- le16_to_cpu(tmp->lde_reclen);
- tmp->lde_reclen =
- cpu_to_le16((char *)(dp->ldp_entries) -
- (char *)tmp);
- goto repeat;
- }
- }
- first->ldp_hash_end = hash_end;
- first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
- first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
+ dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE);
+ if (((unsigned long)dp & ~CFS_PAGE_MASK) && nlupgs > 0) {
+ ent = lu_dirent_start(dp);
+
+ if (tmp) {
+ /* enlarge the end entry lde_reclen from 0 to
+ * first entry of next lu_dirpage, in this way
+ * several lu_dirpages can be stored into one
+ * client page on client. */
+ tmp = ((void *)tmp) +
+ le16_to_cpu(tmp->lde_reclen);
+ tmp->lde_reclen =
+ cpu_to_le16((char *)(dp->ldp_entries) -
+ (char *)tmp);
+ goto repeat;
+ }
+ }
+ first->ldp_hash_end = hash_end;
+ first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE);
+ first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE);
#else
- SET_BUT_UNUSED(tmp);
+ SET_BUT_UNUSED(tmp);
#endif
- cfs_kunmap(pages[i]);
- }
- EXIT;
-cleanup:
- if (obj)
- lmv_object_put_unlock(obj);
- return rc;
+ cfs_kunmap(pages[i]);
+ }
+ RETURN(rc);
}
static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_tgt_desc *tgt = NULL;
- struct lmv_object *obj;
- int rc;
- int sidx;
- int loop = 0;
- ENTRY;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt = NULL;
+ int rc;
+ ENTRY;
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
-repeat:
- ++loop;
- LASSERT(loop <= 2);
- LASSERT(op_data->op_namelen != 0);
-
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype,
- obj->lo_objcount,
- op_data->op_name,
- op_data->op_namelen);
- op_data->op_bias &= ~MDS_CHECK_SPLIT;
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv,
- obj->lo_stripes[sidx].ls_mds);
- lmv_object_put(obj);
- CDEBUG(D_INODE, "UNLINK '%*s' in "DFID" -> %u\n",
- op_data->op_namelen, op_data->op_name,
- PFID(&op_data->op_fid1), sidx);
- }
-
- if (tgt == NULL) {
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
- op_data->op_bias |= MDS_CHECK_SPLIT;
- }
-
- op_data->op_fsuid = cfs_curproc_fsuid();
- op_data->op_fsgid = cfs_curproc_fsgid();
- op_data->op_cap = cfs_curproc_cap_pack();
+ tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- /*
- * If child's fid is given, cancel unused locks for it if it is from
- * another export than parent.
- *
- * LOOKUP lock for child (fid3) should also be cancelled on parent
- * tgt_tgt in mdc_unlink().
- */
- op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+ op_data->op_fsuid = cfs_curproc_fsuid();
+ op_data->op_fsgid = cfs_curproc_fsgid();
+ op_data->op_cap = cfs_curproc_cap_pack();
- /*
- * Cancel FULL locks on child (fid3).
- */
- rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
- MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
+ /*
+ * If child's fid is given, cancel unused locks for it if it is from
+ * another export than parent.
+ *
+ * LOOKUP lock for child (fid3) should also be cancelled on parent
+ * tgt_tgt in mdc_unlink().
+ */
+ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
- if (rc == 0)
- rc = md_unlink(tgt->ltd_exp, op_data, request);
+ /*
+ * Cancel FULL locks on child (fid3).
+ */
+ rc = lmv_early_cancel(exp, op_data, tgt->ltd_idx, LCK_EX,
+ MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
- if (rc == -ERESTART) {
- LASSERT(*request != NULL);
- DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
- "Got -ERESTART during unlink!\n");
- ptlrpc_req_finished(*request);
- *request = NULL;
+ if (rc != 0)
+ RETURN(rc);
- /*
- * Directory got split. Time to update local object and repeat
- * the request with proper MDS.
- */
- rc = lmv_handle_split(exp, &op_data->op_fid1);
- if (rc == 0)
- goto repeat;
- }
- RETURN(rc);
+ rc = md_unlink(tgt->ltd_exp, op_data, request);
+
+ RETURN(rc);
}
static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
struct md_enqueue_info *minfo,
struct ldlm_enqueue_info *einfo)
{
- struct md_op_data *op_data = &minfo->mi_data;
- struct obd_device *obd = exp->exp_obd;
- struct lmv_obd *lmv = &obd->u.lmv;
- struct lmv_object *obj;
- struct lmv_tgt_desc *tgt = NULL;
- int rc;
- int sidx;
- ENTRY;
-
- rc = lmv_check_connect(obd);
- if (rc)
- RETURN(rc);
-
- if (op_data->op_namelen) {
- obj = lmv_object_find(obd, &op_data->op_fid1);
- if (obj) {
- sidx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
- (char *)op_data->op_name,
- op_data->op_namelen);
- op_data->op_fid1 = obj->lo_stripes[sidx].ls_fid;
- tgt = lmv_get_target(lmv, obj->lo_stripes[sidx].ls_mds);
- lmv_object_put(obj);
- }
- }
+ struct md_op_data *op_data = &minfo->mi_data;
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lmv_tgt_desc *tgt = NULL;
+ int rc;
+ ENTRY;
- if (tgt == NULL)
- tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ rc = lmv_check_connect(obd);
+ if (rc)
+ RETURN(rc);
- if (IS_ERR(tgt))
- RETURN(PTR_ERR(tgt));
+ tgt = lmv_find_target(lmv, &op_data->op_fid1);
+ if (IS_ERR(tgt))
+ RETURN(PTR_ERR(tgt));
- rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
- RETURN(rc);
+ rc = md_intent_getattr_async(tgt->ltd_exp, minfo, einfo);
+ RETURN(rc);
}
int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
struct lprocfs_static_vars lvars;
int rc;
- lmv_object_cache = cfs_mem_cache_create("lmv_objects",
- sizeof(struct lmv_object),
- 0, 0);
- if (!lmv_object_cache) {
- CERROR("Error allocating lmv objects cache\n");
- return -ENOMEM;
- }
-
lprocfs_lmv_init_vars(&lvars);
rc = class_register_type(&lmv_obd_ops, &lmv_md_ops,
lvars.module_vars, LUSTRE_LMV_NAME, NULL);
- if (rc)
- cfs_mem_cache_destroy(lmv_object_cache);
-
return rc;
}
static void lmv_exit(void)
{
class_unregister_type(LUSTRE_LMV_NAME);
-
- LASSERTF(cfs_atomic_read(&lmv_object_count) == 0,
- "Can't free lmv objects cache, %d object(s) busy\n",
- cfs_atomic_read(&lmv_object_count));
- cfs_mem_cache_destroy(lmv_object_cache);
}
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");