- LMV manages fids and llite asks it to allocate new ones.
#ifdef __KERNEL__
# include <linux/types.h>
-# include <linux/fs.h> /* to check for FMODE_EXEC, dev_t, lest we redefine */
+# include <linux/fs.h> /* to check for FMODE_EXEC, dev_t, lest we redefine */
# ifdef CONFIG_FS_POSIX_ACL
# include <linux/xattr_acl.h>
# endif
#ifdef __KERNEL__
# include <linux/fs.h>
# include <linux/list.h>
-# include <linux/sched.h> /* for struct task_struct, for current.h */
-# include <asm/current.h> /* for smp_lock.h */
+# include <linux/sched.h> /* for struct task_struct, for current.h */
+# include <asm/current.h> /* for smp_lock.h */
# include <linux/smp_lock.h>
# include <linux/proc_fs.h>
# include <linux/mount.h>
struct lmv_tgt_desc {
struct obd_uuid uuid;
- struct obd_export *ltd_exp;
+ struct obd_export *ltd_exp;
int active; /* is this target up for requests */
+ int idx;
};
struct lmv_obd {
int refcount;
spinlock_t lmv_lock;
struct lmv_desc desc;
- struct lmv_tgt_desc *tgts;
struct obd_uuid cluuid;
struct obd_export *exp;
- int tgts_size;
int connected;
int max_easize;
int max_def_easize;
int max_cookiesize;
int server_timeout;
struct semaphore init_sem;
+
+ struct lmv_tgt_desc *tgts;
+ int tgts_size;
+
+ struct obd_connect_data *datas;
+ int datas_size;
+
+ struct lu_fid *fids;
+ int fids_size;
+ spinlock_t fids_lock;
+
struct obd_connect_data conn_data;
};
int rc;
};
+#define LUSTRE_OPC_MKDIR (1 << 0)
+#define LUSTRE_OPC_SYMLINK (1 << 1)
+#define LUSTRE_OPC_MKNODE (1 << 2)
+#define LUSTRE_OPC_CREATE (1 << 3)
+
+struct placement_hint {
+ struct qstr *ph_pname;
+ struct qstr *ph_cname;
+ int ph_opc;
+};
/* device types (not names--FIXME) */
/* FIXME all the references to these defines need to be updated */
struct obd_connect_data *ocd);
int (*o_disconnect)(struct obd_export *exp);
+ /* may be later these should be moved into separate fid_ops */
+ int (*o_fid_alloc)(struct obd_export *exp, struct lu_fid *fid,
+ struct placement_hint *hint);
+
+ int (*o_fid_delete)(struct obd_export *exp, struct lu_fid *fid);
+
int (*o_statfs)(struct obd_device *obd, struct obd_statfs *osfs,
unsigned long max_age);
int (*o_packmd)(struct obd_export *exp, struct lov_mds_md **disk_tgt,
int (*m_cancel_unused)(struct obd_export *, struct lu_fid *,
int flags, void *opaque);
- int (*m_delete)(struct obd_export *, struct lu_fid *);
/*
* NOTE: If adding ops, add another LPROCFS_MD_OP_INIT() line to
RETURN(rc);
}
+static inline int obd_fid_alloc(struct obd_export *exp,
+ struct lu_fid *fid,
+ struct placement_hint *hint)
+{
+ int rc;
+ ENTRY;
+
+ EXP_CHECK_DT_OP(exp, fid_alloc);
+ OBD_COUNTER_INCREMENT(exp->exp_obd, fid_alloc);
+
+ rc = OBP(exp->exp_obd, fid_alloc)(exp, fid, hint);
+ RETURN(rc);
+}
+
+static inline int obd_fid_delete(struct obd_export *exp,
+ struct lu_fid *fid)
+{
+ int rc;
+ ENTRY;
+
+ if (OBP(exp->exp_obd, fid_delete) == NULL)
+ RETURN(0);
+
+ OBD_COUNTER_INCREMENT(exp->exp_obd, fid_delete);
+ rc = OBP(exp->exp_obd, fid_delete)(exp, fid);
+ RETURN(rc);
+}
+
static inline int obd_init_export(struct obd_export *exp)
{
int rc = 0;
RETURN(rc);
}
-static inline int md_delete(struct obd_export *exp,
- struct lu_fid *fid)
-{
- int rc;
- ENTRY;
-
- if (MDP(exp->exp_obd, delete) == NULL)
- RETURN(0);
-
- MD_COUNTER_INCREMENT(exp->exp_obd, delete);
- rc = MDP(exp->exp_obd, delete)(exp, fid);
- RETURN(rc);
-}
-
static inline int md_getattr(struct obd_export *exp, struct lu_fid *fid,
obd_valid valid, int ea_size,
struct ptlrpc_request **request)
#include "llite_internal.h"
/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid)
+int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
+ struct placement_hint *hint)
{
+ int rc;
ENTRY;
- spin_lock(&sbi->ll_fid_lock);
- if (sbi->ll_fid.f_oid < LUSTRE_FID_SEQ_WIDTH) {
- sbi->ll_fid.f_oid += 1;
- *fid = sbi->ll_fid;
- } else {
- CERROR("sequence is exhausted. Switching to "
- "new one is not yet implemented\n");
- RETURN(-ERANGE);
+ rc = obd_fid_alloc(sbi->ll_md_exp, fid, hint);
+ if (rc) {
+ CERROR("cannot allocate new metadata fid, rc %d\n", rc);
+ RETURN(rc);
}
- spin_unlock(&sbi->ll_fid_lock);
- RETURN(0);
+ RETURN(rc);
}
/* allocates passed fid, that is assigns f_num and f_seq to the @fid */
-int ll_fid_dt_alloc(struct ll_sb_info *sbi, struct lu_fid *fid)
+int ll_fid_dt_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
+ struct placement_hint *hint)
{
+ int rc;
ENTRY;
- RETURN(-EOPNOTSUPP);
+
+ rc = obd_fid_alloc(sbi->ll_dt_exp, fid, hint);
+ if (rc) {
+ CERROR("cannot allocate new data fid, rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ RETURN(rc);
}
/* build inode number on passed @fid */
struct list_head ll_deathrow; /* inodes to be destroyed (b1443) */
spinlock_t ll_deathrow_lock;
-
- /* last allocated fids */
- spinlock_t ll_fid_lock;
- struct lu_fid ll_fid;
};
struct ll_ra_read {
int ll_removexattr(struct dentry *dentry, const char *name);
/* llite/llite_fid.c*/
-int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid);
-int ll_fid_dt_alloc(struct ll_sb_info *sbi, struct lu_fid *fid);
+int ll_fid_md_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
+ struct placement_hint *hint);
+
+int ll_fid_dt_alloc(struct ll_sb_info *sbi, struct lu_fid *fid,
+ struct placement_hint *hint);
+
ino_t ll_fid_build_ino(struct ll_sb_info *sbi, struct lu_fid *fid);
#endif /* LLITE_INTERNAL_H */
struct ptlrpc_request *request = NULL;
struct lustre_handle osc_conn = {0, };
struct lustre_handle mdc_conn = {0, };
- struct lustre_md md;
struct obd_connect_data *data = NULL;
- struct obd_connect_data *md_data = NULL;
+ struct lustre_md md;
int err;
ENTRY;
if (data == NULL)
RETURN(-ENOMEM);
- OBD_ALLOC_PTR(md_data);
- if (md_data == NULL) {
- OBD_FREE_PTR(data);
- RETURN(-ENOMEM);
- }
-
if (proc_lustre_fs_root) {
err = lprocfs_register_mountpoint(proc_lustre_fs_root, sb,
osc, mdc);
if (err)
GOTO(out_mdc, err);
- /* MDC connect is surely finished by now */
- //LASSERT(class_conn2cliimp(&mdc_conn)->imp_connect_data);
- //*data = class_conn2cliimp(&mdc_conn)->imp_connect_data;
- //*md_data = class_exp2cliimp(sbi->ll_md_exp)->imp_connect_data;
-
LASSERT(osfs.os_bsize);
sb->s_blocksize = osfs.os_bsize;
sb->s_blocksize_bits = log2(osfs.os_bsize);
CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid));
sbi->ll_root_fid = rootfid;
- spin_lock_init(&sbi->ll_fid_lock);
-
- /* initializing ->ll_fid. It is known that root object has separate
- * sequence, so that we use what MDS returned to us and do not check if
- * f_oid collides with root or not. */
- sbi->ll_fid.f_seq = md_data->ocd_seq;
- sbi->ll_fid.f_oid = LUSTRE_FID_INIT_OID;
-
sb->s_op = &lustre_super_operations;
/* make root inode
out:
if (data != NULL)
OBD_FREE_PTR(data);
- if (md_data != NULL)
- OBD_FREE_PTR(md_data);
lprocfs_unregister_mountpoint(sbi);
RETURN(err);
}
/* allocate new fid for child */
if (it->it_op == IT_OPEN || it->it_op == IT_CREAT) {
- rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data.fid2);
+ struct placement_hint hint = { .ph_pname = NULL,
+ .ph_cname = &dentry->d_name,
+ .ph_opc = LUSTRE_OPC_CREATE };
+
+ rc = ll_fid_md_alloc(ll_i2sbi(parent), &op_data.fid2, &hint);
if (rc) {
CERROR("can't allocate new fid, rc %d\n", rc);
LBUG();
static int ll_mknod(struct inode *dir, struct dentry *dchild, int mode,
ll_dev_t rdev)
{
+ struct placement_hint hint = { .ph_pname = NULL,
+ .ph_cname = &dchild->d_name,
+ .ph_opc = LUSTRE_OPC_MKNODE };
+
struct ptlrpc_request *request = NULL;
struct inode *inode = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
case S_IFIFO:
case S_IFSOCK:
/* allocate new fid */
- err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data.fid2);
+ err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data.fid2, &hint);
if (err) {
CERROR("can't allocate new fid, rc %d\n", err);
LBUG();
static int ll_symlink_raw(struct nameidata *nd, const char *tgt)
{
+ struct placement_hint hint = { .ph_pname = NULL,
+ .ph_cname = &nd->dentry->d_name,
+ .ph_opc = LUSTRE_OPC_SYMLINK };
+
struct inode *dir = nd->dentry->d_inode;
struct ptlrpc_request *request = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
dir, tgt);
/* allocate new fid */
- err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data.fid2);
+ err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data.fid2, &hint);
if (err) {
CERROR("can't allocate new fid, rc %d\n", err);
LBUG();
}
ll_prepare_md_op_data(&op_data, dir, NULL,
- nd->last.name, nd->last.len, 0);
+ nd->last.name, nd->last.len, 0);
err = md_create(sbi->ll_md_exp, &op_data,
tgt, strlen(tgt) + 1, S_IFLNK | S_IRWXUGO,
static int ll_mkdir_raw(struct nameidata *nd, int mode)
{
+ struct placement_hint hint = { .ph_pname = NULL,
+ .ph_cname = &nd->dentry->d_name,
+ .ph_opc = LUSTRE_OPC_MKDIR };
struct inode *dir = nd->dentry->d_inode;
struct ptlrpc_request *request = NULL;
struct ll_sb_info *sbi = ll_i2sbi(dir);
mode = (mode & (S_IRWXUGO|S_ISVTX) & ~current->fs->umask) | S_IFDIR;
/* allocate new fid */
- err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data.fid2);
+ err = ll_fid_md_alloc(ll_i2sbi(dir), &op_data.fid2, &hint);
if (err) {
CERROR("can't allocate new fid, rc %d\n", err);
LBUG();
lmv->exp = exp;
lmv->connected = 0;
lmv->cluuid = *cluuid;
+
+ /* saving */
if (data)
memcpy(&lmv->conn_data, data, sizeof(*data));
{
struct lmv_obd *lmv = &obd->u.lmv;
struct obd_uuid *cluuid = &lmv->cluuid;
+ struct obd_connect_data *mdc_data = NULL;
struct obd_uuid lmv_mdc_uuid = { "LMV_MDC_UUID" };
struct lustre_handle conn = {0, };
struct obd_device *mdc_obd;
}
mdc_exp = class_conn2export(&conn);
+ mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
rc = obd_register_observer(mdc_obd, obd);
if (rc) {
}
}
- tgt->ltd_exp = mdc_exp;
tgt->active = 1;
+ tgt->ltd_exp = mdc_exp;
lmv->desc.ld_active_tgt_count++;
+ /* copy connect data, it may be used later */
+ lmv->datas[tgt->idx] = *mdc_data;
+
+ /* setup start fid for this target */
+ lmv->fids[tgt->idx].f_seq = mdc_data->ocd_seq;
+ lmv->fids[tgt->idx].f_oid = LUSTRE_FID_INIT_OID;
+
md_init_ea_size(tgt->ltd_exp, lmv->max_easize,
lmv->max_def_easize, lmv->max_cookiesize);
RETURN(rc);
}
+static int lmv_fids_balanced(struct obd_device *obd)
+{
+ ENTRY;
+ RETURN(0);
+}
+
+/* returns number of target where new fid should be allocated using passed @hint
+ * as input data for making decision. */
+static int lmv_plcament_policy(struct obd_device *obd,
+ struct placement_hint *hint)
+{
+ ENTRY;
+
+ /* here are some policies to allocate new fid */
+ if (hint->ph_cname && lmv_fids_balanced(obd)) {
+ /* allocate new fid basing on its name in the case fids are
+ * balanced, that is all sequences have more or less equal
+ * number of objects created. */
+ } else {
+ /* sequences among all tgts are not well balanced, allocate new
+ * fid taking this into account to balance them. */
+ }
+
+ RETURN(0);
+}
+
+static int lmv_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
+ struct placement_hint *hint)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct lu_fid *tgt_fid;
+ int rc = 0, i;
+ ENTRY;
+
+ LASSERT(fid != NULL);
+ LASSERT(hint != NULL);
+
+ i = lmv_plcament_policy(obd, hint);
+ if (i < 0 || i >= lmv->desc.ld_tgt_count) {
+ CERROR("can't get target for allocating fid\n");
+ RETURN(-EINVAL);
+ }
+
+ tgt_fid = &lmv->fids[i];
+
+ spin_lock(&lmv->fids_lock);
+ if (fid_oid(tgt_fid) < LUSTRE_FID_SEQ_WIDTH) {
+ tgt_fid->f_oid += 1;
+ *fid = *tgt_fid;
+ } else {
+ CERROR("sequence is exhausted. Switching to "
+ "new one is not yet implemented\n");
+ rc = -ERANGE;
+ }
+ spin_unlock(&lmv->fids_lock);
+ RETURN(rc);
+}
+
+static int lmv_fid_delete(struct obd_export *exp, struct lu_fid *fid)
+{
+ ENTRY;
+
+ LASSERT(exp && fid);
+ if (lmv_obj_delete(exp, fid)) {
+ CDEBUG(D_OTHER, "lmv object "DFID3" is destroyed.\n",
+ PFID3(fid));
+ }
+ RETURN(0);
+}
+
static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
struct lmv_obd *lmv = &obd->u.lmv;
struct lprocfs_static_vars lvars;
struct lmv_desc *desc;
- int rc = 0;
+ int rc, i = 0;
ENTRY;
if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
lmv->tgts_size = LMV_MAX_TGT_COUNT * sizeof(struct lmv_tgt_desc);
OBD_ALLOC(lmv->tgts, lmv->tgts_size);
- if (lmv->tgts == NULL) {
- CERROR("Out of memory\n");
+ if (lmv->tgts == NULL)
RETURN(-ENOMEM);
- }
+
+ for (i = 0; i < LMV_MAX_TGT_COUNT; i++)
+ lmv->tgts[i].idx = i;
+
+ lmv->datas_size = LMV_MAX_TGT_COUNT * sizeof(struct obd_connect_data);
+
+ OBD_ALLOC(lmv->datas, lmv->datas_size);
+ if (lmv->datas == NULL)
+ GOTO(out_free_tgts, rc = -ENOMEM);
+
+ lmv->fids_size = LMV_MAX_TGT_COUNT * sizeof(struct lu_fid);
+
+ OBD_ALLOC(lmv->fids, lmv->fids_size);
+ if (lmv->fids == NULL)
+ GOTO(out_free_datas, rc = -ENOMEM);
obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
lmv->desc.ld_tgt_count = 0;
lmv->max_def_easize = 0;
lmv->max_easize = 0;
+ spin_lock_init(&lmv->fids_lock);
spin_lock_init(&lmv->lmv_lock);
sema_init(&lmv->init_sem, 1);
if (rc) {
CERROR("Can't setup LMV object manager, "
"error %d.\n", rc);
- OBD_FREE(lmv->tgts, lmv->tgts_size);
+ GOTO(out_free_datas, rc);
RETURN(rc);
}
}
#endif
RETURN(0);
+
+out_free_datas:
+ OBD_FREE(lmv->datas, lmv->datas_size);
+ lmv->datas = NULL;
+out_free_tgts:
+ OBD_FREE(lmv->tgts, lmv->tgts_size);
+ lmv->tgts = NULL;
+ return rc;
}
static int lmv_cleanup(struct obd_device *obd)
lprocfs_obd_cleanup(obd);
lmv_mgr_cleanup(obd);
+ OBD_FREE(lmv->fids, lmv->fids_size);
+ OBD_FREE(lmv->datas, lmv->datas_size);
OBD_FREE(lmv->tgts, lmv->tgts_size);
RETURN(0);
RETURN(rc);
}
-static int lmv_delete(struct obd_export *exp, struct lu_fid *fid)
-{
- ENTRY;
-
- LASSERT(exp && fid);
- if (lmv_obj_delete(exp, fid)) {
- CDEBUG(D_OTHER, "lmv object "DFID3" is destroyed.\n",
- PFID3(fid));
- }
- RETURN(0);
-}
-
static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
.o_packmd = lmv_packmd,
.o_unpackmd = lmv_unpackmd,
.o_notify = lmv_notify,
- .o_iocontrol = lmv_iocontrol,
+ .o_fid_alloc = lmv_fid_alloc,
+ .o_fid_delete = lmv_fid_delete,
+ .o_iocontrol = lmv_iocontrol
};
struct md_ops lmv_md_ops = {
.m_readpage = lmv_readpage,
.m_unlink = lmv_unlink,
.m_init_ea_size = lmv_init_ea_size,
- .m_delete = lmv_delete,
.m_cancel_unused = lmv_cancel_unused,
.m_set_lock_data = lmv_set_lock_data,
.m_lock_match = lmv_lock_match,
LPROCFS_OBD_OP_INIT(num_private_stats, stats, connect);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, reconnect);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, disconnect);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_alloc);
+ LPROCFS_OBD_OP_INIT(num_private_stats, stats, fid_delete);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, statfs);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, packmd);
LPROCFS_OBD_OP_INIT(num_private_stats, stats, unpackmd);
LASSERT(obd->obd_proc_entry != NULL);
LASSERT(obd->md_cntr_base == 0);
- num_stats = 1 + MD_COUNTER_OFFSET(delete) +
+ num_stats = 1 + MD_COUNTER_OFFSET(cancel_unused) +
num_private_stats;
stats = lprocfs_alloc_stats(num_stats);
if (stats == NULL)
LPROCFS_MD_OP_INIT(num_private_stats, stats, set_open_replay_data);
LPROCFS_MD_OP_INIT(num_private_stats, stats, clear_open_replay_data);
LPROCFS_MD_OP_INIT(num_private_stats, stats, set_lock_data);
- LPROCFS_MD_OP_INIT(num_private_stats, stats, delete);
for (i = num_private_stats; i < num_stats; i++) {
if (stats->ls_percpu[0]->lp_cntr[i].lc_name == NULL) {