Whamcloud - gitweb
- for proto better use simple seq. allocation without using meta-sequence which is...
authoryury <yury>
Tue, 11 Apr 2006 16:36:40 +0000 (16:36 +0000)
committeryury <yury>
Tue, 11 Apr 2006 16:36:40 +0000 (16:36 +0000)
- implement simple ll_fid_alloc() which allocates new fids on client;
- implemented simple ll_fid2ino() which allocates client inode numbers from passed fid;

- root object is allocated in separate sequence with number 1. Sequences and fid numbers with value 0 are not allowed by ldlm - fixing that. First seq is 1, first fid in seq is 1;

- implemented simple mdt_alloc_seq() which allocates new sequences to clients;
- fixes in mdt_init_seq();
- using sema instead of spinlock in mdt_alloc_seq(), as bumping the seq and saving it to back store should be atomic operation and sleeping on possible IO with lcoked spin lock is not allowed.

lustre/include/linux/lustre_idl.h
lustre/llite/llite_fid.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/mdd/mdd_handler.c
lustre/mdd/mdd_internal.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/obdclass/lu_object.c
lustre/ptlrpc/pack_generic.c

index 1afbd18..338db74 100644 (file)
@@ -605,36 +605,6 @@ typedef enum {
 #define LUSTRE_CONFIG_SET 0
 #define LUSTRE_CONFIG_GET 1
 
-/* meta-sequence */
-struct lu_msq {
-        __u64 m_ran; /* holds number of ranges allocated to clients. Thus,
-                      * server allocates 2 ^ 64 ranges. */
-        
-        __u32 m_seq; /* holds number of sequences allocated in a range. Thus,
-                      * each client may use 2 ^ 32 sequences before asking
-                      * server to allocate new. */
-        
-        __u32 m_pad; /* padding */
-};
-
-extern void lustre_swab_msq(struct lu_msq *msq);
-
-static inline __u64 msq_ran(struct lu_msq *msq)
-{
-        return msq->m_ran;
-}
-
-static inline __u32 msq_seq(struct lu_msq *msq)
-{
-        return msq->m_seq;
-}
-
-#define DSEQ "["LPU64"/%u]"
-
-#define PSEQ(seq)     \
-        msq_ran(seq), \
-        msq_seq(seq)
-
 #define LUSTRE_CONFIG_METASEQ "metaseq"
 #define LUSTRE_CONFIG_TRANSNO "transno"
 
@@ -646,9 +616,15 @@ struct lu_fid {
         __u32 f_ver;  /* holds fid version. */
 };
 
+#define LUSTRE_ROOT_FID_SEQ 1
+#define LUSTRE_ROOT_FID_OID 2
+
 /* maximal objects in sequence */
 #define LUSTRE_FID_SEQ_WIDTH 10000
 
+/* initial fid id value */
+#define LUSTRE_FID_INIT_OID  1
+
 /* shift of version component */
 #define LUSTRE_FID_VER_SHIFT (sizeof(((struct lu_fid *)0)->f_ver) * 8)
 
@@ -717,10 +693,11 @@ struct obd_connect_data {
         __u32          ocd_index;                /* LOV index to connect to */
         __u32          ocd_unused;
         __u64          ocd_ibits_known;          /* inode bits this client understands */
-        struct lu_msq  ocd_msq;                  /* meta-sequence info */
+        __u64          ocd_seq;                  /* sequence info for client */
         __u64          padding2;                 /* also fix lustre_swab_connect */
         __u64          padding3;                 /* also fix lustre_swab_connect */
         __u64          padding4;                 /* also fix lustre_swab_connect */
+        __u64          padding5;                 /* also fix lustre_swab_connect */
 };
 
 extern void lustre_swab_connect(struct obd_connect_data *ocd);
index 8c40db3..0c8ae2f 100644 (file)
 int ll_fid_alloc(struct ll_sb_info *sbi, struct lu_fid *fid)
 {
         ENTRY;
+
+        spin_lock(&sbi->ll_fid_lock);
+        if (sbi->ll_md_fid.f_oid < LUSTRE_FID_SEQ_WIDTH) {
+                sbi->ll_md_fid.f_oid += 1;
+                *fid = sbi->ll_md_fid;
+        } else {
+                CERROR("sequence is exhausted. Switching to "
+                       "new one is not yet implemented\n");
+                LBUG();
+        }
+        spin_unlock(&sbi->ll_fid_lock);
+        
         RETURN(0);
 }
 
 /* build inode number on passed @fid */
 unsigned long ll_fid2ino(struct ll_sb_info *sbi, struct lu_fid *fid)
 {
+        unsigned long ino;
         ENTRY;
-        RETURN(0);
+
+        /* very stupid and having many downsides inode allocation algorithm
+         * based on fid. */
+        ino = (fid_seq(fid) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(fid);
+        RETURN(ino);
 }
index 3daa660..5aec267 100644 (file)
@@ -181,7 +181,10 @@ struct ll_sb_info {
         struct list_head          ll_deathrow; /* inodes to be destroyed (b1443) */
         spinlock_t                ll_deathrow_lock;
 
-        struct lu_fid             ll_fid;
+        /* last allocated fids */
+        spinlock_t                ll_fid_lock;
+        struct lu_fid             ll_dt_fid;
+        struct lu_fid             ll_md_fid;
 };
 
 struct ll_ra_read {
index 7a3bf0e..c797015 100644 (file)
@@ -117,6 +117,8 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
         struct lustre_handle mdc_conn = {0, };
         struct lustre_md md;
         struct obd_connect_data *data = NULL;
+        struct obd_connect_data *md_data = NULL;
+        struct obd_connect_data *dt_data = NULL;
         int err;
         ENTRY;
 
@@ -174,6 +176,7 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
 
         /* async connect is surely finished by now */
         *data = class_exp2cliimp(sbi->ll_mdc_exp)->imp_connect_data;
+        md_data = &class_exp2cliimp(sbi->ll_mdc_exp)->imp_connect_data;
 
         LASSERT(osfs.os_bsize);
         sb->s_blocksize = osfs.os_bsize;
@@ -242,6 +245,7 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
         spin_unlock(&sbi->ll_lco.lco_lock);
 
         mdc_init_ea_size(sbi->ll_mdc_exp, sbi->ll_osc_exp);
+        dt_data = &class_exp2cliimp(sbi->ll_osc_exp)->imp_connect_data;
 
         err = obd_prep_async_page(sbi->ll_osc_exp, NULL, NULL, NULL,
                                   0, NULL, NULL, NULL);
@@ -270,6 +274,18 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
         CDEBUG(D_SUPER, "rootfid "DFID3"\n", PFID3(&rootfid));
         sbi->ll_root_fid = rootfid;
 
+        spin_lock_init(&sbi->ll_fid_lock);
+        
+        /* initializing @ll_md_fid. It is known that root object has separate
+         * sequence, so that we use what MDS returned to us and do not check if
+         * f_oid collides with root or not. */
+        sbi->ll_md_fid.f_seq = md_data->ocd_seq;
+        sbi->ll_md_fid.f_oid = LUSTRE_FID_INIT_OID;
+
+        /* initializing @ll_dt_fid */
+        sbi->ll_md_fid.f_seq = dt_data->ocd_seq;
+        sbi->ll_md_fid.f_oid = LUSTRE_FID_INIT_OID;
+
         sb->s_op = &lustre_super_operations;
 
         /* make root inode
index 60e4d9f..8f12ef1 100644 (file)
@@ -250,8 +250,8 @@ static int mdd_object_print(struct seq_file *f, const struct lu_object *o)
 
 static int mdd_fs_setup(struct mdd_device *mdd)
 {
-        mdd->mdd_rootfid.f_seq = ROOT_FID_SEQ;
-        mdd->mdd_rootfid.f_oid = ROOT_FID_OID;
+        mdd->mdd_rootfid.f_seq = LUSTRE_ROOT_FID_SEQ;
+        mdd->mdd_rootfid.f_oid = LUSTRE_ROOT_FID_OID;
         mdd->mdd_rootfid.f_ver = 0;        
         return 0;
 }
index 32a90d7..236bd1e 100644 (file)
@@ -4,9 +4,6 @@
 #ifndef _MDD_INTERNAL_H
 #define _MDD_INTERNAL_H
 
-#define ROOT_FID_SEQ 0
-#define ROOT_FID_OID 2
-
 struct mdd_device {
         struct md_device                 mdd_md_dev;
         struct dt_device                *mdd_child;
index 6d7ea1e..c4d869e 100644 (file)
@@ -929,23 +929,43 @@ struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
                                prntfn, c->psc_num_threads);
 }
 
-/* default meta-sequenve values */
-#define LUSTRE_METASEQ_DEFAULT_RAN 0
-#define LUSTRE_METASEQ_DEFAULT_SEQ 0
+static int mdt_config(struct mdt_device *m, const char *name,
+                      void *buf, int size, int mode)
+{
+        struct md_device *child = m->mdt_child;
+        int rc;
+        ENTRY;
+
+        if (!child->md_ops->mdo_config)
+                RETURN(-EOPNOTSUPP);
+        
+        rc = child->md_ops->mdo_config(child, name, buf, size, mode);
+        RETURN(rc);
+}
 
-/* allocate meta-sequence to client */
-int mdt_alloc_metaseq(struct mdt_device *m, struct lu_msq *msq)
+/* allocate sequence to client */
+int mdt_alloc_seq(struct mdt_device *m, __u64 *seq)
 {
+        int rc = 0;
         ENTRY;
 
         LASSERT(m != NULL);
-        LASSERT(msq != NULL);
+        LASSERT(seq != NULL);
         
-        spin_lock(&m->mdt_msq_lock);
-
-        /* to be continued */
+        down(&m->mdt_seq_sem);
+        m->mdt_seq += 1;
+        *seq = m->mdt_seq;
+
+        /* update new allocated sequence on store */
+        rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
+                        &m->mdt_seq, sizeof(m->mdt_seq),
+                        LUSTRE_CONFIG_SET);
+        if (rc) {
+                CERROR("can't save new seq, rc %d\n",
+                       rc);
+        }
         
-        spin_unlock(&m->mdt_msq_lock);
+        up(&m->mdt_seq_sem);
 
         RETURN(0);
 }
@@ -953,39 +973,41 @@ int mdt_alloc_metaseq(struct mdt_device *m, struct lu_msq *msq)
 /* initialize meta-sequence. First of all try to get it from lower layer down to
  * back store one. In the case this is first run and there is not meta-sequence
  * initialized yet - store it to backstore. */
-static int mdt_init_metaseq(struct mdt_device *m)
+static int mdt_init_seq(struct mdt_device *m)
 {
-        struct md_device *child = m->mdt_child;
-        int rc;
+        int rc = 0;
         ENTRY;
 
-        m->mdt_msq.m_ran = LUSTRE_METASEQ_DEFAULT_RAN;
-        m->mdt_msq.m_seq = LUSTRE_METASEQ_DEFAULT_SEQ;
+        /* allocate next seq after root one */
+        m->mdt_seq = LUSTRE_ROOT_FID_SEQ + 1;
+
+        rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
+                        &m->mdt_seq, sizeof(m->mdt_seq),
+                        LUSTRE_CONFIG_GET);
 
-        if (!child->md_ops->mdo_config)
-                GOTO(out, rc = 0);
-        
-        rc = child->md_ops->mdo_config(child, LUSTRE_CONFIG_METASEQ,
-                                       &m->mdt_msq, sizeof(m->mdt_msq),
-                                       LUSTRE_CONFIG_GET);
         if (rc == -EOPNOTSUPP) {
-                /* provide zero error and let contnibnue with default values of
-                 * meta-sequence. */
+                /* provide zero error and let continue with default value of
+                 * sequence. */
                 GOTO(out, rc = 0);
         } else if (rc == -ENODATA) {
-                CWARN("initialize new meta-sequence\n");
-
-                /*initialize new meta-sequence config as it is not yet
-                 * created. */
-                rc = child->md_ops->mdo_config(child, LUSTRE_CONFIG_METASEQ,
-                                               &m->mdt_msq, sizeof(m->mdt_msq),
-                                               LUSTRE_CONFIG_SET);
-                if (rc) {
+                CWARN("initialize new sequence\n");
+
+                /*initialize new sequence config as it is not yet created. */
+                rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
+                                &m->mdt_seq, sizeof(m->mdt_seq),
+                                LUSTRE_CONFIG_SET);
+                if (rc == -EOPNOTSUPP) {
+                        /* provide zero error and let continue with default
+                         * value of sequence. */
+                        CERROR("can't update save initial sequence. "
+                               "No method defined\n");
+                        GOTO(out, rc = 0);
+                } else if (rc) {
                         CERROR("can't update config %s, rc %d\n",
                                LUSTRE_CONFIG_METASEQ, rc);
                         GOTO(out, rc);
                 }
-        } else {
+        } else if (rc) {
                 CERROR("can't get config %s, rc %d\n",
                        LUSTRE_CONFIG_METASEQ, rc);
                 GOTO(out, rc);
@@ -993,10 +1015,8 @@ static int mdt_init_metaseq(struct mdt_device *m)
 
         EXIT;
 out:
-        if (rc == 0) {
-                CWARN("initialized meta-sequence: "DSEQ"\n",
-                      PSEQ(&m->mdt_msq));
-        }
+        if (rc == 0)
+                CWARN("last used sequence: "LPU64"\n", m->mdt_seq);
         return rc;
 }
 
@@ -1061,7 +1081,7 @@ static int mdt_init0(struct mdt_device *m,
         if (m->mdt_child)
                 mdt_child = md2lu_dev(m->mdt_child);
         
-        spin_lock_init(&m->mdt_msq_lock);
+        sema_init(&m->mdt_seq_sem, 1);
 
         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
@@ -1111,8 +1131,8 @@ static int mdt_init0(struct mdt_device *m,
                 LBUG();
         }
 
-        /* init meta-sequence info after device stack is initialized. */
-        rc = mdt_init_metaseq(m);
+        /* init sequence info after device stack is initialized. */
+        rc = mdt_init_seq(m);
         if (rc)
                 GOTO(err_fini_child, rc);
 
@@ -1241,8 +1261,8 @@ static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
         med->med_mcd = mcd;
 
-        rc = mdt_alloc_metaseq(mdt_dev(obd->obd_lu_dev),
-                               &data->ocd_msq);
+        rc = mdt_alloc_seq(mdt_dev(obd->obd_lu_dev),
+                           &data->ocd_seq);
         if (rc)
                 GOTO(out, rc);
 out:
index 2ce2a77..5bece66 100644 (file)
@@ -77,8 +77,8 @@ struct mdt_device {
         unsigned long              mdt_flags;
 
         /* Seq management related stuff */
-        spinlock_t                 mdt_msq_lock
-        struct lu_msq              mdt_msq;
+        struct semaphore           mdt_seq_sem
+        __u64                      mdt_seq;
 };
 
 static inline struct md_device_operations *mdt_child_ops(struct mdt_device * m)
@@ -168,7 +168,7 @@ struct mdt_thread_info {
 
 };
 
-int mdt_alloc_metaseq(struct mdt_device *m, struct lu_msq *msq);
+int mdt_alloc_seq(struct mdt_device *, __u64 *);
 
 int fid_lock(struct ldlm_namespace *, const struct lu_fid *,
              struct lustre_handle *, ldlm_mode_t,
index a528107..0498c34 100644 (file)
@@ -198,7 +198,7 @@ static __u32 fid_hash(const struct lu_fid *f)
 {
         /* all objects with same id and different versions will belong to same
          * collisions list. */
-        return fid_seq(f) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
+        return (fid_seq(f) - 1) * LUSTRE_FID_SEQ_WIDTH + fid_oid(f);
 }
 
 struct lu_object *lu_object_find(struct lu_site *s, const struct lu_fid *f)
index 0a233de..16056a8 100644 (file)
@@ -501,12 +501,6 @@ void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size,
  * lustre_idl.h implemented here.
  */
 
-void lustre_swab_msq(struct lu_msq *msq)
-{
-        __swab64s (&msq->m_ran);
-        __swab32s (&msq->m_seq);
-}
-
 void lustre_swab_connect(struct obd_connect_data *ocd)
 {
         __swab64s (&ocd->ocd_connect_flags);
@@ -515,10 +509,11 @@ void lustre_swab_connect(struct obd_connect_data *ocd)
         __swab32s (&ocd->ocd_index);
         __swab32s (&ocd->ocd_unused);
         __swab64s (&ocd->ocd_ibits_known);
-        lustre_swab_msq(&ocd->ocd_msq);
+        __swab64s (&ocd->ocd_seq);
         CLASSERT(offsetof(typeof(*ocd), padding2) != 0);
         CLASSERT(offsetof(typeof(*ocd), padding3) != 0);
         CLASSERT(offsetof(typeof(*ocd), padding4) != 0);
+        CLASSERT(offsetof(typeof(*ocd), padding5) != 0);
 }
 
 void lustre_swab_obdo (struct obdo  *o)