#define LUSTRE_CONFIG_SET 0
#define LUSTRE_CONFIG_GET 1
-/* meta-sequence */
-struct lu_msq {
- __u64 m_ran; /* holds number of ranges allocated to clients. Thus,
- * server allocates 2 ^ 64 ranges. */
-
- __u32 m_seq; /* holds number of sequences allocated in a range. Thus,
- * each client may use 2 ^ 32 sequences before asking
- * server to allocate new. */
-
- __u32 m_pad; /* padding */
-};
-
-extern void lustre_swab_msq(struct lu_msq *msq);
-
-static inline __u64 msq_ran(struct lu_msq *msq)
-{
- return msq->m_ran;
-}
-
-static inline __u32 msq_seq(struct lu_msq *msq)
-{
- return msq->m_seq;
-}
-
-#define DSEQ "["LPU64"/%u]"
-
-#define PSEQ(seq) \
- msq_ran(seq), \
- msq_seq(seq)
-
#define LUSTRE_CONFIG_METASEQ "metaseq"
#define LUSTRE_CONFIG_TRANSNO "transno"
__u32 f_ver; /* holds fid version. */
};
+#define LUSTRE_ROOT_FID_SEQ 1
+#define LUSTRE_ROOT_FID_OID 2
+
/* maximal objects in sequence */
#define LUSTRE_FID_SEQ_WIDTH 10000
+/* initial fid id value */
+#define LUSTRE_FID_INIT_OID 1
+
/* shift of version component */
#define LUSTRE_FID_VER_SHIFT (sizeof(((struct lu_fid *)0)->f_ver) * 8)
__u32 ocd_index; /* LOV index to connect to */
__u32 ocd_unused;
__u64 ocd_ibits_known; /* inode bits this client understands */
- struct lu_msq ocd_msq; /* meta-sequence info */
+ __u64 ocd_seq; /* sequence info for client */
__u64 padding2; /* also fix lustre_swab_connect */
__u64 padding3; /* also fix lustre_swab_connect */
__u64 padding4; /* also fix lustre_swab_connect */
+ __u64 padding5; /* also fix lustre_swab_connect */
};
extern void lustre_swab_connect(struct obd_connect_data *ocd);
int ll_fid_alloc(struct ll_sb_info *sbi, struct lu_fid *fid)
{
ENTRY;
+
+ spin_lock(&sbi->ll_fid_lock);
+ if (sbi->ll_md_fid.f_oid < LUSTRE_FID_SEQ_WIDTH) {
+ sbi->ll_md_fid.f_oid += 1;
+ *fid = sbi->ll_md_fid;
+ } else {
+ CERROR("sequence is exhausted. Switching to "
+ "new one is not yet implemented\n");
+ LBUG();
+ }
+ spin_unlock(&sbi->ll_fid_lock);
+
RETURN(0);
}
/* build inode number on passed @fid */
unsigned long ll_fid2ino(struct ll_sb_info *sbi, struct lu_fid *fid)
{
+ unsigned long ino;
ENTRY;
- RETURN(0);
+
+ /* very stupid and having many downsides inode allocation algorithm
+ * based on fid. */
+ ino = (fid_seq(fid) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(fid);
+ RETURN(ino);
}
struct list_head ll_deathrow; /* inodes to be destroyed (b1443) */
spinlock_t ll_deathrow_lock;
- struct lu_fid ll_fid;
+ /* last allocated fids */
+ spinlock_t ll_fid_lock;
+ struct lu_fid ll_dt_fid;
+ struct lu_fid ll_md_fid;
};
struct ll_ra_read {
struct lustre_handle mdc_conn = {0, };
struct lustre_md md;
struct obd_connect_data *data = NULL;
+ struct obd_connect_data *md_data = NULL;
+ struct obd_connect_data *dt_data = NULL;
int err;
ENTRY;
/* async connect is surely finished by now */
*data = class_exp2cliimp(sbi->ll_mdc_exp)->imp_connect_data;
+ md_data = &class_exp2cliimp(sbi->ll_mdc_exp)->imp_connect_data;
LASSERT(osfs.os_bsize);
sb->s_blocksize = osfs.os_bsize;
spin_unlock(&sbi->ll_lco.lco_lock);
mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
+ dt_data = &class_exp2cliimp(sbi->ll_osc_exp)->imp_connect_data;
err = obd_prep_async_page(sbi->ll_osc_exp, NULL, NULL, NULL,
0, NULL, NULL, NULL);
CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid));
sbi->ll_root_fid = rootfid;
+ spin_lock_init(&sbi->ll_fid_lock);
+
+ /* initializing @ll_md_fid. It is known that root object has separate
+ * sequence, so that we use what MDS returned to us and do not check if
+ * f_oid collides with root or not. */
+ sbi->ll_md_fid.f_seq = md_data->ocd_seq;
+ sbi->ll_md_fid.f_oid = LUSTRE_FID_INIT_OID;
+
+ /* initializing @ll_dt_fid */
+ sbi->ll_md_fid.f_seq = dt_data->ocd_seq;
+ sbi->ll_md_fid.f_oid = LUSTRE_FID_INIT_OID;
+
sb->s_op = &lustre_super_operations;
/* make root inode
static int mdd_fs_setup(struct mdd_device *mdd)
{
- mdd->mdd_rootfid.f_seq = ROOT_FID_SEQ;
- mdd->mdd_rootfid.f_oid = ROOT_FID_OID;
+ mdd->mdd_rootfid.f_seq = LUSTRE_ROOT_FID_SEQ;
+ mdd->mdd_rootfid.f_oid = LUSTRE_ROOT_FID_OID;
mdd->mdd_rootfid.f_ver = 0;
return 0;
}
#ifndef _MDD_INTERNAL_H
#define _MDD_INTERNAL_H
-#define ROOT_FID_SEQ 0
-#define ROOT_FID_OID 2
-
struct mdd_device {
struct md_device mdd_md_dev;
struct dt_device *mdd_child;
prntfn, c->psc_num_threads);
}
-/* default meta-sequenve values */
-#define LUSTRE_METASEQ_DEFAULT_RAN 0
-#define LUSTRE_METASEQ_DEFAULT_SEQ 0
+static int mdt_config(struct mdt_device *m, const char *name,
+ void *buf, int size, int mode)
+{
+ struct md_device *child = m->mdt_child;
+ int rc;
+ ENTRY;
+
+ if (!child->md_ops->mdo_config)
+ RETURN(-EOPNOTSUPP);
+
+ rc = child->md_ops->mdo_config(child, name, buf, size, mode);
+ RETURN(rc);
+}
-/* allocate meta-sequence to client */
-int mdt_alloc_metaseq(struct mdt_device *m, struct lu_msq *msq)
+/* allocate sequence to client */
+int mdt_alloc_seq(struct mdt_device *m, __u64 *seq)
{
+ int rc = 0;
ENTRY;
LASSERT(m != NULL);
- LASSERT(msq != NULL);
+ LASSERT(seq != NULL);
- spin_lock(&m->mdt_msq_lock);
-
- /* to be continued */
+ down(&m->mdt_seq_sem);
+ m->mdt_seq += 1;
+ *seq = m->mdt_seq;
+
+ /* update new allocated sequence on store */
+ rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
+ &m->mdt_seq, sizeof(m->mdt_seq),
+ LUSTRE_CONFIG_SET);
+ if (rc) {
+ CERROR("can't save new seq, rc %d\n",
+ rc);
+ }
- spin_unlock(&m->mdt_msq_lock);
+ up(&m->mdt_seq_sem);
RETURN(0);
}
/* initialize meta-sequence. First of all try to get it from lower layer down to
* back store one. In the case this is first run and there is not meta-sequence
* initialized yet - store it to backstore. */
-static int mdt_init_metaseq(struct mdt_device *m)
+static int mdt_init_seq(struct mdt_device *m)
{
- struct md_device *child = m->mdt_child;
- int rc;
+ int rc = 0;
ENTRY;
- m->mdt_msq.m_ran = LUSTRE_METASEQ_DEFAULT_RAN;
- m->mdt_msq.m_seq = LUSTRE_METASEQ_DEFAULT_SEQ;
+ /* allocate next seq after root one */
+ m->mdt_seq = LUSTRE_ROOT_FID_SEQ + 1;
+
+ rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
+ &m->mdt_seq, sizeof(m->mdt_seq),
+ LUSTRE_CONFIG_GET);
- if (!child->md_ops->mdo_config)
- GOTO(out, rc = 0);
-
- rc = child->md_ops->mdo_config(child, LUSTRE_CONFIG_METASEQ,
- &m->mdt_msq, sizeof(m->mdt_msq),
- LUSTRE_CONFIG_GET);
if (rc == -EOPNOTSUPP) {
- /* provide zero error and let contnibnue with default values of
- * meta-sequence. */
+ /* provide zero error and let continue with default value of
+ * sequence. */
GOTO(out, rc = 0);
} else if (rc == -ENODATA) {
- CWARN("initialize new meta-sequence\n");
-
- /*initialize new meta-sequence config as it is not yet
- * created. */
- rc = child->md_ops->mdo_config(child, LUSTRE_CONFIG_METASEQ,
- &m->mdt_msq, sizeof(m->mdt_msq),
- LUSTRE_CONFIG_SET);
- if (rc) {
+ CWARN("initialize new sequence\n");
+
+ /*initialize new sequence config as it is not yet created. */
+ rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
+ &m->mdt_seq, sizeof(m->mdt_seq),
+ LUSTRE_CONFIG_SET);
+ if (rc == -EOPNOTSUPP) {
+ /* provide zero error and let continue with default
+ * value of sequence. */
+ CERROR("can't update save initial sequence. "
+ "No method defined\n");
+ GOTO(out, rc = 0);
+ } else if (rc) {
CERROR("can't update config %s, rc %d\n",
LUSTRE_CONFIG_METASEQ, rc);
GOTO(out, rc);
}
- } else {
+ } else if (rc) {
CERROR("can't get config %s, rc %d\n",
LUSTRE_CONFIG_METASEQ, rc);
GOTO(out, rc);
EXIT;
out:
- if (rc == 0) {
- CWARN("initialized meta-sequence: "DSEQ"\n",
- PSEQ(&m->mdt_msq));
- }
+ if (rc == 0)
+ CWARN("last used sequence: "LPU64"\n", m->mdt_seq);
return rc;
}
if (m->mdt_child)
mdt_child = md2lu_dev(m->mdt_child);
- spin_lock_init(&m->mdt_msq_lock);
+ sema_init(&m->mdt_seq_sem, 1);
m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
LBUG();
}
- /* init meta-sequence info after device stack is initialized. */
- rc = mdt_init_metaseq(m);
+ /* init sequence info after device stack is initialized. */
+ rc = mdt_init_seq(m);
if (rc)
GOTO(err_fini_child, rc);
memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
med->med_mcd = mcd;
- rc = mdt_alloc_metaseq(mdt_dev(obd->obd_lu_dev),
- &data->ocd_msq);
+ rc = mdt_alloc_seq(mdt_dev(obd->obd_lu_dev),
+ &data->ocd_seq);
if (rc)
GOTO(out, rc);
out:
unsigned long mdt_flags;
/* Seq management related stuff */
- spinlock_t mdt_msq_lock;
- struct lu_msq mdt_msq;
+ struct semaphore mdt_seq_sem;
+ __u64 mdt_seq;
};
static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
};
-int mdt_alloc_metaseq(struct mdt_device *m, struct lu_msq *msq);
+int mdt_alloc_seq(struct mdt_device *, __u64 *);
int fid_lock(struct ldlm_namespace *, const struct lu_fid *,
struct lustre_handle *, ldlm_mode_t,
{
/* all objects with same id and different versions will belong to same
* collisions list. */
- return fid_seq(f) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
+ return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
}
struct lu_object *lu_object_find(struct lu_site *s, const struct lu_fid *f)
* lustre_idl.h implemented here.
*/
-void lustre_swab_msq(struct lu_msq *msq)
-{
- __swab64s (&msq->m_ran);
- __swab32s (&msq->m_seq);
-}
-
void lustre_swab_connect(struct obd_connect_data *ocd)
{
__swab64s (&ocd->ocd_connect_flags);
__swab32s (&ocd->ocd_index);
__swab32s (&ocd->ocd_unused);
__swab64s (&ocd->ocd_ibits_known);
- lustre_swab_msq(&ocd->ocd_msq);
+ __swab64s (&ocd->ocd_seq);
CLASSERT(offsetof(typeof(*ocd), padding2) != 0);
CLASSERT(offsetof(typeof(*ocd), padding3) != 0);
CLASSERT(offsetof(typeof(*ocd), padding4) != 0);
+ CLASSERT(offsetof(typeof(*ocd), padding5) != 0);
}
void lustre_swab_obdo (struct obdo *o)