- added proc stats to mdt.
LPROC_CMM_SPLIT_CHECK = 0,
LPROC_CMM_SPLIT,
LPROC_CMM_LOOKUP,
+ LPROC_CMM_CREATE,
LPROC_CMM_LAST
};
cmm->cmm_stats = stats;
+ lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_CREATE,
+ LPROCFS_CNTR_AVGMINMAX, "create", "time");
lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_LOOKUP,
LPROCFS_CNTR_AVGMINMAX, "lookup", "time");
lprocfs_counter_init(cmm->cmm_stats, LPROC_CMM_SPLIT,
const struct lu_fid *fid = &loh->loh_fid;
struct lu_object *lo = NULL;
struct cmm_device *cd;
- mdsno_t mdsnum;
+ mdsno_t mds;
int rc = 0;
ENTRY;
cd = lu2cmm_dev(ld);
if (cd->cmm_flags & CMM_INITIALIZED) {
/* get object location */
- rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mdsnum, env);
+ rc = cmm_fld_lookup(lu2cmm_dev(ld), fid, &mds, env);
if (rc)
RETURN(NULL);
} else
* as part of early bootstrap procedure (it is /ROOT, or /fld,
* etc.). Such object *has* to be local.
*/
- mdsnum = cd->cmm_local_num;
+ mds = cd->cmm_local_num;
/* select the proper set of operations based on object location */
- if (mdsnum == cd->cmm_local_num) {
+ if (mds == cd->cmm_local_num) {
struct cml_object *clo;
OBD_ALLOC_PTR(clo);
cro->cmm_obj.cmo_obj.mo_ops = &cmr_mo_ops;
cro->cmm_obj.cmo_obj.mo_dir_ops = &cmr_dir_ops;
lo->lo_ops = &cmr_obj_ops;
- cro->cmo_num = mdsnum;
+ cro->cmo_num = mds;
}
}
RETURN(lo);
const char *name, struct md_object *mo_c,
struct md_op_spec *spec, struct md_attr *ma)
{
+ struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo_p));
+ struct timeval start;
int rc;
ENTRY;
-#ifdef HAVE_SPLIT_SUPPORT
+ cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_CREATE);
+#ifdef HAVE_SPLIT_SUPPORT
/* Lock mode always should be sane. */
LASSERT(spec->sp_cr_mode != MDL_MINMODE);
* Sigh... This is long story. MDT may have race with detecting if split
* is possible in cmm. We know this race and let it live, because
* getting it rid (with some sem or spinlock) will also mean that
- * PDIROPS for create will not work, what is really bad for performance
- * and makes no sense. So, we better allow the race but split dir only
- * if some of concurrent threads takes EX lock. So that, say, two
- * concurrent threads may have different lock modes on directory (CW and
- * EX) and not first one which comes here should split dir, but only
- * that which has EX lock. And we do not care that in this case, split
- * will happen a bit later may be (when dir size will not be mandatory
- * 64K, but may be larger). So that, we allow concurrent creates and
- * protect split by EX lock.
+ * PDIROPS for create will not work because we kill parallel work, what
+ * is really bad for performance and makes no sense having PDIROPS. So,
+ * we better allow the race to live, but split dir only if some of
+ * concurrent threads takes EX lock, not matter which one. So that, say,
+ * two concurrent threads may have different lock modes on directory (CW
+ * and EX) and not first one which comes here and see that split is
+ * possible should split the dir, but only that one which has EX
+ * lock. And we do not care that in this case, split may happen a bit
+ * later (when dir size will not be necessarily 64K, but may be a bit
+ * larger). So that, we allow concurrent creates and protect split by EX
+ * lock.
*/
if (spec->sp_cr_mode == MDL_EX) {
/*
* -ERESTART or some split error is returned, we can't
* proceed with create.
*/
- RETURN(rc);
+ GOTO(out, rc);
}
if (spec != NULL && spec->sp_ck_split) {
*/
rc = cmm_split_check(env, mo_p, name);
if (rc)
- RETURN(rc);
+ GOTO(out, rc);
}
#endif
rc = mdo_create(env, md_object_next(mo_p), name, md_object_next(mo_c),
spec, ma);
- RETURN(rc);
+ EXIT;
+out:
+ cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_CREATE);
+ return rc;
}
static int cml_create_data(const struct lu_env *env, struct md_object *p,
int rc;
ENTRY;
+
/* check the SGID attr */
cmi = cmm_env_info(env);
LASSERT(cmi);
tmp_ma->ma_need |= MA_ACL_DEF;
}
#endif
-
rc = mo_attr_get(env, md_object_next(mo_p), tmp_ma);
if (rc)
RETURN(rc);
}
#endif
- /* remote object creation and local name insert */
+ /* Remote object creation and local name insert. */
rc = mo_object_create(env, md_object_next(mo_c), spec, ma);
if (rc == 0) {
rc = mdo_name_insert(env, md_object_next(mo_p),
#include <linux/lustre_version.h>
#ifndef LUSTRE_KERNEL_VERSION
-#define IT_OPEN (1)
-#define IT_CREAT (1<<1)
-#define IT_READDIR (1<<2)
-#define IT_GETATTR (1<<3)
-#define IT_LOOKUP (1<<4)
-#define IT_UNLINK (1<<5)
-#define IT_TRUNC (1<<6)
-#define IT_GETXATTR (1<<7)
+
+#define IT_OPEN (1 << 0)
+#define IT_CREAT (1 << 1)
+#define IT_READDIR (1 << 2)
+#define IT_GETATTR (1 << 3)
+#define IT_LOOKUP (1 << 4)
+#define IT_UNLINK (1 << 5)
+#define IT_TRUNC (1 << 6)
+#define IT_GETXATTR (1 << 7)
struct lustre_intent_data {
int it_disposition;
} d;
};
-
#endif
#endif
extern void lustre_swab_obdo (struct obdo *o);
+enum {
+ LUSTRE_OPC_MKDIR = (1 << 0),
+ LUSTRE_OPC_SYMLINK = (1 << 1),
+ LUSTRE_OPC_MKNOD = (1 << 2),
+ LUSTRE_OPC_CREATE = (1 << 3),
+ LUSTRE_OPC_ANY = (1 << 4)
+};
+
struct md_op_data {
struct lu_fid op_fid1;
struct lu_fid op_fid2;
/* Various operation flags. */
__u32 op_bias;
+
+ /* Operation type */
+ __u32 op_opc;
};
#define MDS_MODE_DONT_LOCK (1 << 30)
int rc;
};
-#define LUSTRE_OPC_MKDIR (1 << 0)
-#define LUSTRE_OPC_SYMLINK (1 << 1)
-#define LUSTRE_OPC_MKNOD (1 << 2)
-#define LUSTRE_OPC_CREATE (1 << 3)
-
-struct lu_placement_hint {
- struct qstr *ph_pname;
- struct lu_fid *ph_pfid;
- struct qstr *ph_cname;
- int ph_opc;
-};
-
#define LUSTRE_FLD_NAME "fld"
#define LUSTRE_SEQ_NAME "seq"
struct obd_connect_data *ocd);
int (*o_disconnect)(struct obd_export *exp);
- /* maybe later these should be moved into separate fid_ops */
+ /* Initialize/finalize fids infrastructure. */
int (*o_fid_init)(struct obd_export *exp);
int (*o_fid_fini)(struct obd_export *exp);
+ /* Allocate new fid according to passed @hint. */
int (*o_fid_alloc)(struct obd_export *exp, struct lu_fid *fid,
- struct lu_placement_hint *hint);
+ struct md_op_data *op_data);
+ /*
+ * Object with @fid is getting deleted, we may want to do something
+ * about this.
+ */
int (*o_fid_delete)(struct obd_export *exp, const struct lu_fid *fid);
int (*o_statfs)(struct obd_device *obd, struct obd_statfs *osfs,
ENTRY;
if (OBP(exp->exp_obd, fid_init) == NULL)
- RETURN(-ENOTSUPP);
+ RETURN(0);
OBD_COUNTER_INCREMENT(exp->exp_obd, fid_init);
ENTRY;
if (OBP(exp->exp_obd, fid_fini) == NULL)
- RETURN(-ENOTSUPP);
+ RETURN(0);
OBD_COUNTER_INCREMENT(exp->exp_obd, fid_fini);
static inline int obd_fid_alloc(struct obd_export *exp,
struct lu_fid *fid,
- struct lu_placement_hint *hint)
+ struct md_op_data *op_data)
{
int rc;
ENTRY;
OBD_COUNTER_INCREMENT(exp->exp_obd, fid_alloc);
- rc = OBP(exp->exp_obd, fid_alloc)(exp, fid, hint);
+ rc = OBP(exp->exp_obd, fid_alloc)(exp, fid, op_data);
RETURN(rc);
}
rc = ldlm_lock_match(obddev->obd_namespace, LDLM_FL_BLOCK_GRANTED,
&res_id, LDLM_IBITS, &policy, LCK_CR, &lockh);
if (!rc) {
- llu_prep_md_op_data(&op_data, inode, NULL, NULL, 0, 0);
+ llu_prep_md_op_data(&op_data, inode, NULL, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, &it, LCK_CR,
&op_data, &lockh, NULL, 0,
void llu_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
struct inode *i2, const char *name, int namelen,
- int mode)
+ int mode, __u32 opc)
{
LASSERT(i1 != NULL || i2 != NULL);
LASSERT(op_data);
if (i2)
op_data->op_fid2 = *ll_inode2fid(i2);
+ else
+ fid_zero(&op_data->op_fid2);
+ op_data->op_opc = opc;
op_data->op_name = name;
- op_data->op_namelen = namelen;
op_data->op_mode = mode;
+ op_data->op_namelen = namelen;
op_data->op_mod_time = CURRENT_TIME;
}
#include <lustre_ver.h>
#include <lustre_fid.h>
-static int llu_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
- struct lu_placement_hint *hint)
-{
- int rc;
- ENTRY;
- rc = obd_fid_alloc(exp, fid, hint);
- RETURN(rc);
-}
-
-/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
- struct lu_placement_hint *hint)
-{
- ENTRY;
- RETURN(llu_fid_alloc(sbi->ll_md_exp, fid, hint));
-}
-
-/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
- struct lu_placement_hint *hint)
-{
- ENTRY;
- RETURN(llu_fid_alloc(sbi->ll_dt_exp, fid, hint));
-}
-
/* build inode number on passed @fid */
unsigned long llu_fid_build_ino(struct llu_sb_info *sbi,
struct lu_fid *fid)
/* file.c */
void llu_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
struct inode *i2, const char *name, int namelen,
- int mode);
+ int mode, __u32 opc);
void llu_finish_md_op_data(struct md_op_data *op_data);
int llu_create(struct inode *dir, struct pnode_base *pnode, int mode);
int llu_local_open(struct llu_inode_info *lli, struct lookup_intent *it);
char *buf, size_t nbytes);
/* liblustre/llite_fid.c*/
-int llu_fid_md_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
- struct lu_placement_hint *hint);
-
-int llu_fid_dt_alloc(struct llu_sb_info *sbi, struct lu_fid *fid,
- struct lu_placement_hint *hint);
-
unsigned long llu_fid_build_ino(struct llu_sb_info *sbi,
struct lu_fid *fid);
}
llu_prep_md_op_data(&op_data, pnode->p_parent->p_base->pb_ino,
- pb->pb_ino, pb->pb_name.name, pb->pb_name.len,0);
+ pb->pb_ino, pb->pb_name.name, pb->pb_name.len,
+ 0, LUSTRE_OPC_ANY);
rc = md_intent_lock(exp, &op_data, NULL, 0, it, flags,
&req, llu_md_blocking_ast,
struct it_cb_data icbd;
struct ptlrpc_request *req = NULL;
struct lookup_intent lookup_it = { .it_op = IT_LOOKUP };
+ __u32 opc;
int rc;
ENTRY;
icbd.icbd_child = pnode;
icbd.icbd_parent = parent;
- llu_prep_md_op_data(&op_data, parent, NULL,
- pnode->p_base->pb_name.name,
- pnode->p_base->pb_name.len, flags);
-
- /* allocate new fid for child */
if (it->it_op & IT_CREAT ||
(it->it_op & IT_OPEN && it->it_create_mode & O_CREAT)) {
- struct lu_placement_hint hint = { .ph_pname = NULL,
- .ph_cname = &pnode->p_base->pb_name,
- .ph_opc = LUSTRE_OPC_CREATE };
-
- rc = llu_fid_md_alloc(llu_i2sbi(parent), &op_data.op_fid2, &hint);
- if (rc) {
- CERROR("can't allocate new fid, rc %d\n", rc);
- LBUG();
- }
+ opc = LUSTRE_OPC_CREATE;
+ } else {
+ opc = LUSTRE_OPC_ANY;
}
+
+ llu_prep_md_op_data(&op_data, parent, NULL,
+ pnode->p_base->pb_name.name,
+ pnode->p_base->pb_name.len, flags, opc);
+
rc = md_intent_lock(llu_i2mdcexp(parent), &op_data, NULL, 0, it,
flags, &req, llu_md_blocking_ast,
LDLM_FL_CANCEL_ON_BLOCK);
int rc;
ENTRY;
- llu_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0);
+ llu_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0, LUSTRE_OPC_ANY);
rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0, &request);
if (rc) {
struct ptlrpc_request *request = NULL;
struct llu_sb_info *sbi = llu_i2sbi(dir);
struct md_op_data op_data;
- struct lu_placement_hint hint = {
- .ph_pname = NULL,
- .ph_cname = qstr,
- .ph_opc = LUSTRE_OPC_SYMLINK
- };
int err = -EMLINK;
ENTRY;
if (llu_i2stat(dir)->st_nlink >= EXT2_LINK_MAX)
RETURN(err);
- llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0);
+ llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0,
+ LUSTRE_OPC_SYMLINK);
- /* allocate new fid */
- err = llu_fid_md_alloc(sbi, &op_data.op_fid2, &hint);
- if (err) {
- CERROR("can't allocate new fid, rc %d\n", err);
- RETURN(err);
- }
err = md_create(sbi->ll_md_exp, &op_data,
tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
current->fsuid, current->fsgid, current->cap_effective,
struct llu_sb_info *sbi = llu_i2sbi(dir);
struct md_op_data op_data;
int err = -EMLINK;
- struct lu_placement_hint hint = {
- .ph_pname = NULL,
- .ph_cname = &pno->p_base->pb_name,
- .ph_opc = LUSTRE_OPC_MKNOD
- };
ENTRY;
liblustre_wait_event(0);
case S_IFSOCK:
llu_prep_md_op_data(&op_data, dir, NULL,
pno->p_base->pb_name.name,
- pno->p_base->pb_name.len, 0);
- /* allocate new fid */
- err = llu_fid_md_alloc(sbi, &op_data.op_fid2, &hint);
- if (err) {
- CERROR("can't allocate new fid, rc %d\n", err);
- RETURN(err);
- }
+ pno->p_base->pb_name.len, 0,
+ LUSTRE_OPC_MKNOD);
err = md_create(sbi->ll_md_exp, &op_data, NULL, 0, mode,
current->fsuid, current->fsgid,
LASSERT(dir);
liblustre_wait_event(0);
- llu_prep_md_op_data(&op_data, src, dir, name, namelen, 0);
+ llu_prep_md_op_data(&op_data, src, dir, name, namelen, 0,
+ LUSTRE_OPC_ANY);
rc = md_link(llu_i2sbi(src)->ll_md_exp, &op_data, &request);
ptlrpc_req_finished(request);
liblustre_wait_event(0);
LASSERT(target);
liblustre_wait_event(0);
- llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0);
+ llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0,
+ LUSTRE_OPC_ANY);
rc = md_unlink(llu_i2sbi(dir)->ll_md_exp, &op_data, &request);
if (!rc)
rc = llu_objects_destroy(request, dir);
LASSERT(tgt);
liblustre_wait_event(0);
- llu_prep_md_op_data(&op_data, src, tgt, NULL, 0, 0);
+ llu_prep_md_op_data(&op_data, src, tgt, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
rc = md_rename(llu_i2sbi(src)->ll_md_exp, &op_data,
oldname, oldnamelen, newname, newnamelen,
&request);
struct ptlrpc_request *request = NULL;
struct intnl_stat *st = llu_i2stat(dir);
struct md_op_data op_data;
- struct lu_placement_hint hint = {
- .ph_pname = NULL,
- .ph_cname = qstr,
- .ph_opc = LUSTRE_OPC_MKDIR
- };
-
int err = -EMLINK;
ENTRY;
if (st->st_nlink >= EXT2_LINK_MAX)
RETURN(err);
- llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0);
+ llu_prep_md_op_data(&op_data, dir, NULL, name, len, 0,
+ LUSTRE_OPC_MKDIR);
- /* allocate new fid */
- err = llu_fid_md_alloc(llu_i2sbi(dir), &op_data.op_fid2, &hint);
- if (err) {
- CERROR("can't allocate new fid, rc %d\n", err);
- RETURN(err);
- }
err = md_create(llu_i2sbi(dir)->ll_md_exp, &op_data, NULL, 0, mode,
current->fsuid, current->fsgid, current->cap_effective,
0, &request);
(long long)llu_i2stat(dir)->st_ino,
llu_i2info(dir)->lli_st_generation, dir);
- llu_prep_md_op_data(&op_data, dir, NULL, name, len, S_IFDIR);
+ llu_prep_md_op_data(&op_data, dir, NULL, name, len, S_IFDIR,
+ LUSTRE_OPC_ANY);
rc = md_unlink(llu_i2sbi(dir)->ll_md_exp, &op_data, &request);
ptlrpc_req_finished(request);
struct lov_user_md lum, *lump = (struct lov_user_md *)arg;
int rc = 0;
- llu_prep_md_op_data(&op_data, ino, NULL, NULL, 0, 0);
+ llu_prep_md_op_data(&op_data, ino, NULL, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
LASSERT(sizeof(lum) == sizeof(*lump));
LASSERT(sizeof(lum.lmm_objects[0]) ==
lli2->lli_symlink_name = NULL;
ino->i_private = lli2;
- llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR);
+ llu_prep_md_op_data(&data, NULL, ino, NULL, 0, O_RDWR,
+ LUSTRE_OPC_ANY);
rc = md_enqueue(sbi->ll_md_exp, LDLM_IBITS, &oit, LCK_CR, &data,
&lockh, lum, lum_size, ldlm_completion_ast,
parent = de->d_parent->d_inode;
if (it->it_op & IT_CREAT) {
- /*
- * Allocate new fid for case of create or open(O_CREAT). In both
- * cases it->it_op will contain IT_CREAT. In case of
- * open(O_CREAT) agains existing file, fid allocating is not
- * needed, but this is not known until server returns
- * anything. Well, in this case new allocated fid is lost. But
- * this is not big deal, we have 64bit fids. --umka
- */
- struct lu_placement_hint hint = { .ph_pname = NULL,
- .ph_pfid = ll_inode2fid(parent),
- .ph_cname = &de->d_name,
- .ph_opc = LUSTRE_OPC_CREATE };
-
- op_data = ll_prep_md_op_data(NULL, parent, NULL,
- de->d_name.name, de->d_name.len, 0);
+ op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
+ de->d_name.len, 0, LUSTRE_OPC_CREATE);
if (op_data == NULL)
RETURN(-ENOMEM);
- rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data->op_fid2, &hint);
- if (rc) {
- ll_finish_md_op_data(op_data);
- RETURN(rc);
- }
} else {
op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
- de->d_name.name, de->d_name.len, 0);
+ de->d_name.name, de->d_name.len,
+ 0, LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
}
it = &lookup_it;
}
- /* do real lookup here */
- op_data = ll_prep_md_op_data(NULL, parent, NULL,
- de->d_name.name, de->d_name.len, 0);
+ /* Do real lookup here. */
+ op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
+ de->d_name.len, 0, (it->it_op & IT_CREAT ?
+ LUSTRE_OPC_CREATE :
+ LUSTRE_OPC_ANY));
if (op_data == NULL)
RETURN(-ENOMEM);
- if (it->it_op & IT_CREAT) {
- /*
- * Allocate new fid for case of create or open with O_CREAT. In
- * both cases it->it_op will contain IT_CREAT.
- */
- struct lu_placement_hint hint = { .ph_pname = NULL,
- .ph_pfid = ll_inode2fid(parent),
- .ph_cname = &de->d_name,
- .ph_opc = LUSTRE_OPC_CREATE };
-
- rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data->op_fid2, &hint);
- if (rc) {
- ll_finish_md_op_data(op_data);
- RETURN(rc);
- }
- }
-
rc = md_intent_lock(exp, op_data, NULL, 0, it, 0, &req,
ll_md_blocking_ast, 0);
if (rc >= 0) {
struct ptlrpc_request *request;
struct md_op_data *op_data;
- op_data = ll_prep_md_op_data(NULL, dir, NULL, NULL, 0, 0);
+ op_data = ll_prep_md_op_data(NULL, dir, NULL, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
if (op_data == NULL)
return ERR_PTR(-ENOMEM);
lustre_swab_lov_user_md(&lum);
/* swabbing is done in lov_setstripe() on server side */
- op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0);
+ op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
rc = md_setattr(sbi->ll_md_exp, op_data, &lum,
itp->it_flags |= MDS_OPEN_LOCK;
op_data = ll_prep_md_op_data(NULL, parent->d_inode, NULL, name, len,
- O_RDWR);
+ O_RDWR, LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
op_data = ll_prep_md_op_data(NULL, head_inode, tail_parent,
tail_dentry->d_name.name,
- tail_dentry->d_name.len, 0);
+ tail_dentry->d_name.len, 0,
+ LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
/* Call getattr by fid, so do not provide name at all. */
op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
- dentry->d_inode, NULL, 0, 0);
+ dentry->d_inode, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
it->it_flags |= O_CHECK_STALE;
#include <lustre_disk.h>
#include "llite_internal.h"
-static int ll_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
- struct lu_placement_hint *hint)
-{
- int rc;
- ENTRY;
-
- rc = obd_fid_alloc(exp, fid, hint);
- if (rc) {
- CERROR("Can't allocate new fid, rc %d\n", rc);
- RETURN(rc);
- }
-
- LASSERT(fid_is_sane(fid));
- RETURN(rc);
-}
-
-/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
- struct lu_placement_hint *hint)
-{
- ENTRY;
- RETURN(ll_fid_alloc(sbi->ll_md_exp, fid, hint));
-}
-
-/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int ll_fid_dt_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
- struct lu_placement_hint *hint)
-{
- ENTRY;
- RETURN(ll_fid_alloc(sbi->ll_dt_exp, fid, hint));
-}
-
-/* build inode number on passed @fid */
+/* Build inode number on passed @fid */
ino_t ll_fid_build_ino(struct ll_sb_info *sbi,
struct lu_fid *fid)
{
struct md_op_data *ll_prep_md_op_data(struct md_op_data *op_data,
struct inode *i1, struct inode *i2,
const char *name, int namelen,
- int mode);
+ int mode, __u32 opc);
void ll_finish_md_op_data(struct md_op_data *op_data);
/* llite/llite_nfs.c */
int lustre_check_remote_perm(struct inode *inode, int mask);
/* llite/llite_fid.c */
-int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
- struct lu_placement_hint *hint);
-
-int ll_fid_dt_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
- struct lu_placement_hint *hint);
-
ino_t ll_fid_build_ino(struct ll_sb_info *sbi, struct lu_fid *fid);
/* llite/llite_capa.c */
}
sbi->ll_md_exp = class_conn2export(&md_conn);
+ err = obd_fid_init(sbi->ll_md_exp);
+ if (err) {
+ CERROR("Can't init metadata layer FID infrastructure, "
+ "rc %d\n", err);
+ GOTO(out_md, err);
+ }
+
err = obd_statfs(obd, &osfs, cfs_time_current_64() - HZ);
if (err)
- GOTO(out_md, err);
+ GOTO(out_md_fid, err);
size = sizeof(*data);
- err = obd_get_info(sbi->ll_md_exp, strlen(KEY_CONN_DATA), KEY_CONN_DATA,
- &size, data);
+ err = obd_get_info(sbi->ll_md_exp, strlen(KEY_CONN_DATA),
+ KEY_CONN_DATA, &size, data);
if (err) {
CERROR("Get connect data failed: %d \n", err);
GOTO(out_md, err);
obd = class_name2obd(dt);
if (!obd) {
CERROR("DT %s: not setup or attached\n", dt);
- GOTO(out_md, err = -ENODEV);
+ GOTO(out_md_fid, err = -ENODEV);
}
data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION |
LCONSOLE_ERROR("An OST (dt %s) is performing recovery, of which this"
" client is not a part. Please wait for recovery to "
"complete, abort, or time out.\n", dt);
- GOTO(out, err);
+ GOTO(out_md_fid, err);
} else if (err) {
- CERROR("cannot connect to %s: rc = %d\n", dt, err);
- GOTO(out_md, err);
+ CERROR("Cannot connect to %s: rc = %d\n", dt, err);
+ GOTO(out_md_fid, err);
}
sbi->ll_dt_exp = class_conn2export(&dt_conn);
+ err = obd_fid_init(sbi->ll_dt_exp);
+ if (err) {
+ CERROR("Can't init data layer FID infrastructure, "
+ "rc %d\n", err);
+ GOTO(out_dt, err);
+ }
+
spin_lock(&sbi->ll_lco.lco_lock);
sbi->ll_lco.lco_flags = data->ocd_connect_flags;
spin_unlock(&sbi->ll_lco.lco_lock);
LCONSOLE_ERROR("There are no OST's in this filesystem. "
"There must be at least one active OST for "
"a client to start.\n");
- GOTO(out_dt, err);
+ GOTO(out_dt_fid, err);
}
if (!ll_async_page_slab) {
ll_async_page_slab_size,
0, 0, NULL, NULL);
if (!ll_async_page_slab)
- GOTO(out_dt, err = -ENOMEM);
+ GOTO(out_dt_fid, err = -ENOMEM);
}
err = md_getstatus(sbi->ll_md_exp, &rootfid, &oc);
if (err) {
CERROR("cannot mds_connect: rc = %d\n", err);
- GOTO(out_dt, err);
+ GOTO(out_dt_fid, err);
}
CDEBUG(D_SUPER, "rootfid "DFID"\n", PFID(&rootfid));
sbi->ll_root_fid = rootfid;
free_capa(oc);
if (err) {
CERROR("md_getattr failed for root: rc = %d\n", err);
- GOTO(out_dt, err);
+ GOTO(out_dt_fid, err);
}
err = md_get_lustre_md(sbi->ll_md_exp, request,
if (err) {
CERROR("failed to understand root inode md: rc = %d\n", err);
ptlrpc_req_finished (request);
- GOTO(out_dt, err);
+ GOTO(out_dt_fid, err);
}
LASSERT(fid_is_sane(&sbi->ll_root_fid));
OBD_FREE(data, sizeof(*data));
sb->s_root->d_op = &ll_d_root_ops;
RETURN(err);
-
out_root:
if (root)
iput(root);
+out_dt_fid:
+ obd_fid_fini(sbi->ll_dt_exp);
out_dt:
obd_disconnect(sbi->ll_dt_exp);
sbi->ll_dt_exp = NULL;
+out_md_fid:
+ obd_fid_fini(sbi->ll_md_exp);
out_md:
obd_disconnect(sbi->ll_md_exp);
sbi->ll_md_exp = NULL;
if (data != NULL)
OBD_FREE_PTR(data);
lprocfs_unregister_mountpoint(sbi);
- RETURN(err);
+ return err;
}
int ll_get_max_mdsize(struct ll_sb_info *sbi, int *lmmsize)
prune_deathrow(sbi, 0);
list_del(&sbi->ll_conn_chain);
+
+ obd_fid_fini(sbi->ll_dt_exp);
obd_disconnect(sbi->ll_dt_exp);
sbi->ll_dt_exp = NULL;
lprocfs_unregister_mountpoint(sbi);
+ obd_fid_fini(sbi->ll_md_exp);
obd_disconnect(sbi->ll_md_exp);
sbi->ll_md_exp = NULL;
int rc;
ENTRY;
- op_data = ll_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0);
- rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0, &request);
+ op_data = ll_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
+ rc = md_setattr(sbi->ll_md_exp, op_data, NULL, 0, NULL, 0,
+ &request);
if (rc) {
ptlrpc_req_finished(request);
if (rc == -ENOENT) {
if (!oinfo.oi_oa)
RETURN(-ENOMEM);
- op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0);
+ op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
/* this function prepares md_op_data hint for passing ot down to MD stack. */
struct md_op_data *
ll_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
- struct inode *i2, const char *name, int namelen, int mode)
+ struct inode *i2, const char *name, int namelen,
+ int mode, __u32 opc)
{
LASSERT(i1 != NULL);
op_data->op_fid1 = *ll_inode2fid(i1);
op_data->op_capa1 = ll_mdscapa_get(i1);
- /* @i2 may be NULL. In this case caller itself has to initialize ->fid2
- * if needed. */
if (i2) {
op_data->op_fid2 = *ll_inode2fid(i2);
op_data->op_capa2 = ll_mdscapa_get(i2);
+ } else {
+ fid_zero(&op_data->op_fid2);
}
op_data->op_name = name;
op_data->op_fsgid = current->fsgid;
op_data->op_cap = current->cap_effective;
op_data->op_bias = MDS_CHECK_SPLIT;
+ op_data->op_opc = opc;
return op_data;
}
struct ptlrpc_request *req = NULL;
struct md_op_data *op_data;
struct it_cb_data icbd;
+ __u32 opc;
int rc;
ENTRY;
icbd.icbd_childp = &dentry;
icbd.icbd_parent = parent;
- /* prepare operatoin hint first */
-
+ if (it->it_op & IT_CREAT ||
+ (it->it_op & IT_OPEN && it->it_create_mode & O_CREAT))
+ opc = LUSTRE_OPC_CREATE;
+ else
+ opc = LUSTRE_OPC_ANY;
+
op_data = ll_prep_md_op_data(NULL, parent, NULL, dentry->d_name.name,
- dentry->d_name.len, lookup_flags);
+ dentry->d_name.len, lookup_flags, opc);
if (op_data == NULL)
RETURN(ERR_PTR(-ENOMEM));
- /* allocate new fid for child */
- if (it->it_op & IT_CREAT ||
- (it->it_op & IT_OPEN && it->it_create_mode & O_CREAT)) {
- struct lu_placement_hint hint = { .ph_pname = NULL,
- .ph_pfid = ll_inode2fid(parent),
- .ph_cname = &dentry->d_name,
- .ph_opc = LUSTRE_OPC_CREATE };
-
- rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data->op_fid2, &hint);
- if (rc) {
- ll_finish_md_op_data(op_data);
- LASSERT(rc < 0);
- RETURN(ERR_PTR(rc));
- }
- }
-
it->it_create_mode &= ~current->fs->umask;
rc = md_intent_lock(ll_i2mdexp(parent), op_data, NULL, 0, it,
static int ll_mknod_generic(struct inode *dir, struct qstr *name, int mode,
unsigned rdev, struct dentry *dchild)
{
- struct ptlrpc_request *request = NULL;
- struct inode *inode = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
+ struct ptlrpc_request *request = NULL;
struct md_op_data *op_data;
- struct lu_placement_hint hint = {
- .ph_pname = NULL,
- .ph_pfid = ll_inode2fid(dir),
- .ph_cname = name,
- .ph_opc = LUSTRE_OPC_MKNOD
- };
+ struct inode *inode = NULL;
int err;
ENTRY;
case S_IFIFO:
case S_IFSOCK:
op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name,
- name->len, 0);
+ name->len, 0, LUSTRE_OPC_MKNOD);
if (op_data == NULL)
RETURN(-ENOMEM);
- err = ll_fid_md_alloc(sbi, &op_data->op_fid2, &hint);
- if (err) {
- ll_finish_md_op_data(op_data);
- break;
- }
+
err = md_create(sbi->ll_md_exp, op_data, NULL, 0, mode,
current->fsuid, current->fsgid,
current->cap_effective, rdev, &request);
const char *tgt)
{
struct qstr *name = &dchild->d_name;
- struct lu_placement_hint hint = { .ph_pname = NULL,
- .ph_pfid = ll_inode2fid(dir),
- .ph_cname = name,
- .ph_opc = LUSTRE_OPC_SYMLINK };
-
struct ptlrpc_request *request = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct inode *inode = NULL;
name->len, name->name, dir->i_ino, dir->i_generation,
dir, tgt);
- op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, name->len, 0);
+ op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name,
+ name->len, 0, LUSTRE_OPC_SYMLINK);
if (op_data == NULL)
RETURN(-ENOMEM);
- /* allocate new fid */
- err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data->op_fid2, &hint);
- if (err) {
- ll_finish_md_op_data(op_data);
- RETURN(err);
- }
-
- err = md_create(sbi->ll_md_exp, op_data,
- tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
- current->fsuid, current->fsgid, current->cap_effective,
- 0, &request);
+ err = md_create(sbi->ll_md_exp, op_data, tgt, strlen(tgt) + 1,
+ S_IFLNK | S_IRWXUGO, current->fsuid, current->fsgid,
+ current->cap_effective, 0, &request);
ll_finish_md_op_data(op_data);
if (err == 0) {
ll_update_times(request, REPLY_REC_OFF, dir);
src->i_ino, src->i_generation, src, dir->i_ino,
dir->i_generation, dir, name->len, name->name);
- op_data = ll_prep_md_op_data(NULL, src, dir, name->name, name->len, 0);
+ op_data = ll_prep_md_op_data(NULL, src, dir, name->name, name->len,
+ 0, LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
err = md_link(sbi->ll_md_exp, op_data, &request);
int mode, struct dentry *dchild)
{
- struct lu_placement_hint hint = { .ph_pname = NULL,
- .ph_pfid = ll_inode2fid(dir),
- .ph_cname = name,
- .ph_opc = LUSTRE_OPC_MKDIR };
struct ptlrpc_request *request = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct inode *inode = NULL;
mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
- op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, name->len, 0);
+ op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, name->len,
+ 0, LUSTRE_OPC_MKDIR);
if (op_data == NULL)
RETURN(-ENOMEM);
- /* Allocate new fid. */
- err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data->op_fid2, &hint);
- if (err) {
- ll_finish_md_op_data(op_data);
- RETURN(err);
- }
-
err = md_create(sbi->ll_md_exp, op_data, NULL, 0, mode,
current->fsuid, current->fsgid,
current->cap_effective, 0, &request);
}
op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, name->len,
- S_IFDIR);
+ S_IFDIR, LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
CDEBUG(D_VFSTRACE, "VFS Op:name=%.*s,dir=%lu/%u(%p)\n",
name->len, name->name, dir->i_ino, dir->i_generation, dir);
- op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, name->len, 0);
+ op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name,
+ name->len, 0, LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
src->i_ino, src->i_generation, src, tgt_name->len,
tgt_name->name, tgt->i_ino, tgt->i_generation, tgt);
- op_data = ll_prep_md_op_data(NULL, src, tgt, NULL, 0, 0);
+ op_data = ll_prep_md_op_data(NULL, src, tgt, NULL, 0, 0,
+ LUSTRE_OPC_ANY);
if (op_data == NULL)
RETURN(-ENOMEM);
err = md_rename(sbi->ll_md_exp, op_data,
obj = lmv_obj_grab(obd, &rpid);
if (obj) {
int mea_idx;
+
/*
* Directory is already split, so we have to forward request to
* the right MDS.
sop_data->op_fid1 = rpid;
+ if (it->it_op & IT_CREAT) {
+ /*
+ * For open with IT_CREATE and for IT_CREATE cases allocate new
+ * fid and setup FLD for it.
+ */
+ rc = lmv_fid_alloc(exp, &sop_data->op_fid2, sop_data);
+ if (rc)
+ GOTO(out_free_sop_data, rc);
+ }
+
rc = md_intent_lock(tgt_exp, sop_data, lmm, lmmsize, it, flags,
reqp, cb_blocking, extra_lock_flags);
ptlrpc_req_finished(*reqp);
*reqp = NULL;
it->d.lustre.it_data = 0;
+
/*
* Directory got split. Time to update local object and repeat
* the request with proper MDS.
mdsno_t *mds);
int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
mdsno_t mds);
+int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct md_op_data *op_data);
int lmv_alloc_slave_fids(struct obd_device *obd, struct lu_fid *pid,
struct md_op_data *op, struct lu_fid *fid);
RETURN(1);
}
-static int lmv_all_chars_policy(int count, struct qstr *name)
+static int lmv_all_chars_policy(int count, const char *name,
+ int len)
{
unsigned int c = 0;
- unsigned int len = name->len;
while (len > 0)
- c += name->name[-- len];
+ c += name[--len];
c = c % count;
return c;
}
static int lmv_placement_policy(struct obd_device *obd,
- struct lu_placement_hint *hint,
+ struct md_op_data *op_data,
mdsno_t *mds)
{
struct lmv_obd *lmv = &obd->u.lmv;
* balanced, that is all sequences have more or less equal
* number of objects created.
*/
- obj = lmv_obj_grab(obd, hint->ph_pfid);
+ obj = lmv_obj_grab(obd, &op_data->op_fid1);
if (obj) {
struct lu_fid *rpid;
int mea_idx;
*/
mea_idx = raw_name2idx(obj->lo_hashtype,
obj->lo_objcount,
- hint->ph_cname->name,
- hint->ph_cname->len);
+ op_data->op_name,
+ op_data->op_namelen);
rpid = &obj->lo_inodes[mea_idx].li_fid;
*mds = obj->lo_inodes[mea_idx].li_mds;
lmv_obj_put(obj);
rc = 0;
- CDEBUG(D_INODE, "The obj "DFID" has been split, got "
- "MDS at "LPU64" by name %s\n", PFID(hint->ph_pfid),
- *mds, hint->ph_cname->name);
- } else if (hint->ph_cname && (hint->ph_opc == LUSTRE_OPC_MKDIR)) {
+ CDEBUG(D_INODE, "The obj "DFID" has been split, got MDS at "
+ LPU64" by name %s\n", PFID(&op_data->op_fid1), *mds,
+ op_data->op_name);
+ } else if (op_data->op_name && (op_data->op_opc == LUSTRE_OPC_MKDIR)) {
/* Default policy for directories. */
*mds = lmv_all_chars_policy(lmv->desc.ld_tgt_count,
- hint->ph_cname);
+ op_data->op_name,
+ op_data->op_namelen);
rc = 0;
} else {
/*
* Default policy for others is to use parent MDS.
* ONLY directories can be cross-ref during creation.
*/
- rc = lmv_fld_lookup(lmv, hint->ph_pfid, mds);
+ rc = lmv_fld_lookup(lmv, &op_data->op_fid1, mds);
}
} else {
/*
RETURN(rc);
}
-static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
- struct lu_placement_hint *hint)
+int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct md_op_data *op_data)
{
struct obd_device *obd = class_exp2obd(exp);
struct lmv_obd *lmv = &obd->u.lmv;
int rc;
ENTRY;
- LASSERT(hint != NULL);
+ LASSERT(op_data != NULL);
LASSERT(fid != NULL);
- rc = lmv_placement_policy(obd, hint, &mds);
+ rc = lmv_placement_policy(obd, op_data, &mds);
if (rc) {
- CERROR("Can't get target for allocating fid, rc %d\n", rc);
+ CERROR("Can't get target for allocating fid, "
+ "rc %d\n", rc);
RETURN(rc);
}
if (IS_ERR(tgt_exp))
RETURN(PTR_ERR(tgt_exp));
+ rc = lmv_fid_alloc(exp, &op_data->op_fid2, op_data);
+ if (rc)
+ RETURN(rc);
+
CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen,
op_data->op_name, PFID(&op_data->op_fid1));
if (rc == 0) {
if (*request == NULL)
RETURN(rc);
- CDEBUG(D_OTHER, "created. "DFID"\n", PFID(&op_data->op_fid1));
+ CDEBUG(D_OTHER, "created - "DFID"\n", PFID(&op_data->op_fid1));
} else if (rc == -ERESTART) {
LASSERT(*request != NULL);
DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
"Got -ERESTART during create!\n");
ptlrpc_req_finished(*request);
*request = NULL;
+
/*
* Directory got split. Time to update local object and repeat
* the request with proper MDS.
.o_packmd = lmv_packmd,
.o_unpackmd = lmv_unpackmd,
.o_notify = lmv_notify,
- .o_fid_alloc = lmv_fid_alloc,
- .o_fid_delete = lmv_fid_delete,
- .o_iocontrol = lmv_iocontrol
+ .o_iocontrol = lmv_iocontrol,
+ .o_fid_delete = lmv_fid_delete
};
struct md_ops lmv_md_ops = {
void *cb_data, int extra_lock_flags);
/* mdc/mdc_request.c */
+int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct md_op_data *op_data);
+
int mdc_init_ea_size(struct obd_export *exp, int easize, int def_easzie,
int cookiesize);
* this and use the request from revalidate. In this case, revalidate
* never dropped its reference, so the refcounts are all OK */
if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
+ /* For case if upper layer did not alloc fid, do it now. */
+ if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
+ rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
+ if (rc < 0) {
+ CERROR("Can't alloc new fid, rc %d\n", rc);
+ RETURN(rc);
+ }
+ }
+
rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
op_data, &lockh, lmm, lmmsize,
ldlm_completion_ast, cb_blocking, NULL,
if (rc < 0)
RETURN(rc);
memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
- } else if (!fid_is_sane(&op_data->op_fid2) ||
- !(it->it_flags & O_CHECK_STALE)) {
+ } else if (!fid_is_sane(&op_data->op_fid2) || !(it->it_flags & O_CHECK_STALE)) {
/* DISP_ENQ_COMPLETE set means there is extra reference on
* request referenced from this intent, saved for subsequent
* lookup. This path is executed when we proceed to this
struct ptlrpc_request *req;
ENTRY;
+ /* For case if upper layer did not alloc fid, do it now. */
+ if (!fid_is_sane(&op_data->op_fid2)) {
+ /*
+ * mdc_fid_alloc() may return errno 1 in case of switch to new
+ * sequence, handle this.
+ */
+ rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
+ if (rc < 0) {
+ CERROR("Can't alloc new fid, rc %d\n", rc);
+ RETURN(rc);
+ }
+ }
+
size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
- sizeof(struct lustre_capa) : 0;
+ sizeof(struct lustre_capa) : 0;
+
if (data && datalen) {
size[bufcount] = datalen;
bufcount++;
snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s",
exp->exp_obd->obd_name);
- /* init client side sequence-manager */
+ /* Init client side sequence-manager */
rc = seq_client_init(cli->cl_seq, exp,
LUSTRE_SEQ_METADATA,
prefix, NULL);
GOTO(out_free_seq, rc);
RETURN(rc);
-
out_free_seq:
OBD_FREE_PTR(cli->cl_seq);
cli->cl_seq = NULL;
RETURN(0);
}
-static int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
- struct lu_placement_hint *hint)
+int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct md_op_data *op_data)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
struct lu_client_seq *seq = cli->cl_seq;
-
ENTRY;
RETURN(seq_client_alloc_fid(seq, fid));
}
mdt_seq_fini(env, m);
mdt_seq_fini_cli(m);
mdt_fld_fini(env, m);
+ mdt_procfs_fini(m);
ptlrpc_lprocfs_unregister_obd(d->ld_obd);
lprocfs_obd_cleanup(d->ld_obd);
rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
if (rc) {
- CERROR("can't init lu_site, rc %d\n", rc);
+ CERROR("Can't init lu_site, rc %d\n", rc);
GOTO(err_free_site, rc);
}
lprocfs_init_vars(mdt, &lvars);
rc = lprocfs_obd_setup(obd, lvars.obd_vars);
if (rc) {
- CERROR("can't init lprocfs, rc %d\n", rc);
+ CERROR("Can't init lprocfs, rc %d\n", rc);
GOTO(err_fini_site, rc);
}
ptlrpc_lprocfs_register_obd(obd);
+ rc = mdt_procfs_init(m, dev);
+ if (rc) {
+ CERROR("Can't init MDT lprocfs, rc %d\n", rc);
+ GOTO(err_fini_proc, rc);
+ }
+
/* set server index */
LASSERT(num);
s->ls_node_id = simple_strtol(num, NULL, 10);
/* init the stack */
rc = mdt_stack_init(env, m, cfg);
if (rc) {
- CERROR("can't init device stack, rc %d\n", rc);
+ CERROR("Can't init device stack, rc %d\n", rc);
GOTO(err_fini_proc, rc);
}
err_fini_stack:
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
err_fini_proc:
+ mdt_procfs_fini(m);
lprocfs_obd_cleanup(obd);
err_fini_site:
lu_site_fini(s);
struct ptlrpc_thread mdt_ck_thread;
struct lustre_capa_key mdt_capa_keys[2];
unsigned int mdt_capa_conf:1;
+
+ cfs_proc_dir_entry_t *mdt_proc_entry;
+ struct lprocfs_stats *mdt_stats;
};
/*XXX copied from mds_internal.h */
struct txn_param mti_txn_param;
struct lu_buf mti_buf;
struct lustre_capa_key mti_capa_key;
+
+ /* Time for stats */
+ struct timeval mti_time;
};
/*
* Info allocated per-transaction.
return mdt_dlm_lock_modes[mode];
}
-/*
- * Capability
- */
+/* lprocfs stuff */
+int mdt_procfs_init(struct mdt_device *mdt, const char *name);
+int mdt_procfs_fini(struct mdt_device *mdt);
+
+void mdt_lprocfs_time_start(struct mdt_device *mdt,
+ struct timeval *start, int op);
+
+void mdt_lprocfs_time_end(struct mdt_device *mdt,
+ struct timeval *start, int op);
+
+enum {
+ LPROC_MDT_REINT_CREATE = 0,
+ LPROC_MDT_REINT_OPEN,
+ LPROC_MDT_REINT_LINK,
+ LPROC_MDT_REINT_UNLINK,
+ LPROC_MDT_REINT_RENAME,
+ LPROC_MDT_REINT_SETATTR,
+ LPROC_MDT_GETATTR,
+ LPROC_MDT_GETATTR_NAME,
+ LPROC_MDT_INTENT_GETATTR,
+ LPROC_MDT_INTENT_REINT,
+ LPROC_MDT_LAST
+};
+
+/* Capability */
int mdt_ck_thread_start(struct mdt_device *mdt);
void mdt_ck_thread_stop(struct mdt_device *mdt);
void mdt_ck_timer_callback(unsigned long castmeharder);
#include <lprocfs_status.h>
#include "mdt_internal.h"
+static int mdt_procfs_init_stats(struct mdt_device *mdt, int num_stats)
+{
+ struct lprocfs_stats *stats;
+ int rc;
+ ENTRY;
+
+ stats = lprocfs_alloc_stats(num_stats);
+ if (!stats)
+ RETURN(-ENOMEM);
+
+ rc = lprocfs_register_stats(mdt->mdt_proc_entry, "stats", stats);
+ if (rc != 0)
+ GOTO(cleanup, rc);
+
+ mdt->mdt_stats = stats;
+
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_REINT_CREATE,
+ LPROCFS_CNTR_AVGMINMAX, "reint_create", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_REINT_OPEN,
+ LPROCFS_CNTR_AVGMINMAX, "reint_open", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_REINT_LINK,
+ LPROCFS_CNTR_AVGMINMAX, "reint_link", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_REINT_UNLINK,
+ LPROCFS_CNTR_AVGMINMAX, "reint_unlink", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_REINT_SETATTR,
+ LPROCFS_CNTR_AVGMINMAX, "reint_setattr", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_REINT_RENAME,
+ LPROCFS_CNTR_AVGMINMAX, "reint_rename", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_GETATTR,
+ LPROCFS_CNTR_AVGMINMAX, "getattr", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_GETATTR_NAME,
+ LPROCFS_CNTR_AVGMINMAX, "getattr_name", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_INTENT_GETATTR,
+ LPROCFS_CNTR_AVGMINMAX, "intent_getattr", "time");
+ lprocfs_counter_init(mdt->mdt_stats, LPROC_MDT_INTENT_REINT,
+ LPROCFS_CNTR_AVGMINMAX, "intent_reint", "time");
+ EXIT;
+cleanup:
+ if (rc) {
+ lprocfs_free_stats(stats);
+ mdt->mdt_stats = NULL;
+ }
+ return rc;
+}
+
+int mdt_procfs_init(struct mdt_device *mdt, const char *name)
+{
+ struct lu_device *ld = &mdt->mdt_md_dev.md_lu_dev;
+ int rc;
+ ENTRY;
+
+ LASSERT(name != NULL);
+ mdt->mdt_proc_entry = ld->ld_obd->obd_proc_entry;
+ LASSERT(mdt->mdt_proc_entry != NULL);
+
+ rc = mdt_procfs_init_stats(mdt, LPROC_MDT_LAST);
+ return rc;
+}
+
+int mdt_procfs_fini(struct mdt_device *mdt)
+{
+ if (mdt->mdt_stats) {
+ lprocfs_free_stats(mdt->mdt_stats);
+ mdt->mdt_stats = NULL;
+ }
+ if (mdt->mdt_proc_entry)
+ mdt->mdt_proc_entry = NULL;
+ RETURN(0);
+}
+
+void mdt_lprocfs_time_start(struct mdt_device *mdt,
+ struct timeval *start, int op)
+{
+ do_gettimeofday(start);
+}
+
+void mdt_lprocfs_time_end(struct mdt_device *mdt,
+ struct timeval *start, int op)
+{
+ struct timeval end;
+ long timediff;
+
+ do_gettimeofday(&end);
+ timediff = cfs_timeval_sub(&end, start, NULL);
+
+ if (mdt->mdt_stats)
+ lprocfs_counter_add(mdt->mdt_stats, op, timediff);
+ return;
+}
static int lprocfs_rd_identity_expire(char *page, char **start, off_t off,
int count, int *eof, void *data)
str[len - 1] = '\0';
}
-/* FIXME: this macro is copied from lnet/libcfs/nidstring.c */
+/* XXX: This macro is copied from lnet/libcfs/nidstring.c */
#define LNET_NIDSTR_SIZE 32 /* size of each one (see below for usage) */
+
static void do_process_nosquash_nids(struct mdt_device *m, char *buf)
{
struct rootsquash_info *rsi = m->mdt_rootsquash_info;
int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
{
- struct mdt_device *mdt = info->mti_mdt;
- struct ptlrpc_request *req = mdt_info_req(info);
- struct mdt_object *parent;
- struct mdt_object *child;
- struct mdt_lock_handle *lh;
- struct ldlm_reply *ldlm_rep;
- struct mdt_body *repbody;
- struct lu_fid *child_fid = &info->mti_tmp_fid1;
- struct md_attr *ma = &info->mti_attr;
- struct lu_attr *la = &ma->ma_attr;
- __u32 create_flags = info->mti_spec.sp_cr_flags;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ struct mdt_object *parent;
+ struct mdt_object *child;
+ struct mdt_lock_handle *lh;
+ struct ldlm_reply *ldlm_rep;
+ struct mdt_body *repbody;
+ struct lu_fid *child_fid = &info->mti_tmp_fid1;
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_attr *la = &ma->ma_attr;
+ __u32 create_flags = info->mti_spec.sp_cr_flags;
struct mdt_reint_record *rr = &info->mti_rr;
- int result;
- int created = 0;
+ int result;
+ int created = 0;
ENTRY;
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_OPEN | OBD_FAIL_ONCE,
(obd_timeout + 1) / 4);
+ mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time, LPROC_MDT_REINT_OPEN);
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1, 1, 1);
if (result)
lustre_msg_set_transno(req->rq_repmsg, 0);
+ mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
+ LPROC_MDT_REINT_OPEN);
return result;
}
int rc;
ENTRY;
+ mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
+ LPROC_MDT_REINT_CREATE);
+
if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
RETURN(err_serious(-ESTALE));
default:
rc = err_serious(-EOPNOTSUPP);
}
+ mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
+ LPROC_MDT_REINT_CREATE);
RETURN(rc);
}