Whamcloud - gitweb
- cleanups in seq-mgr, fixes;
authoryury <yury>
Mon, 19 Jun 2006 14:37:59 +0000 (14:37 +0000)
committeryury <yury>
Mon, 19 Jun 2006 14:37:59 +0000 (14:37 +0000)
- client allocates new meta-seq uppon connect.

lustre/fid/fid_handler.c
lustre/fid/fid_request.c
lustre/fld/fld_request.c
lustre/include/dt_object.h
lustre/include/lustre/lustre_idl.h
lustre/include/md_object.h
lustre/mdc/mdc_request.c
lustre/mdd/mdd_handler.c
lustre/osd/osd_handler.c

index efac264..3c70812 100644 (file)
@@ -97,14 +97,14 @@ seq_server_alloc_super(struct lu_server_seq *seq,
 
         LASSERT(range_is_sane(space));
         
-        if (space->lr_end - space->lr_start < LUSTRE_SEQ_SUPER_CHUNK) {
+        if (range_space(space) < LUSTRE_SEQ_SUPER_CHUNK) {
                 CWARN("sequences space is going to exhauste soon. "
                       "Only can allocate "LPU64" sequences\n",
                       space->lr_end - space->lr_start);
                 *range = *space;
                 space->lr_start = space->lr_end;
                 rc = 0;
-        } else if (space->lr_start == space->lr_end) {
+        } else if (range_is_exhausted(space)) {
                 CERROR("sequences space is exhausted\n");
                 rc = -ENOSPC;
         } else {
@@ -132,9 +132,9 @@ seq_server_alloc_meta(struct lu_server_seq *seq,
 
         LASSERT(range_is_sane(super));
 
-        /* XXX: here should avoid cascading RPCs using kind of async
+        /* XXX: here we should avoid cascading RPCs using kind of async
          * preallocation when meta-sequence is close to exhausting. */
-        if (super->lr_start == super->lr_end) {
+        if (range_is_exhausted(super)) {
                 if (!seq->seq_cli) {
                         CERROR("no seq-controller client is setup\n");
                         RETURN(-EOPNOTSUPP);
@@ -288,7 +288,7 @@ seq_server_controller(struct lu_server_seq *seq,
                       struct lu_client_seq *cli,
                       const struct lu_context *ctx)
 {
-        int rc;
+        int rc = 0;
         ENTRY;
         
         LASSERT(cli != NULL);
@@ -303,32 +303,39 @@ seq_server_controller(struct lu_server_seq *seq,
                "sequence controller client %s\n",
                cli->seq_exp->exp_client_uuid.uuid);
 
+        down(&seq->seq_sem);
+        
         /* assign controller */
         seq->seq_cli = cli;
 
-        /* allocate new super-sequence */
-        rc = seq_client_alloc_super(cli);
-        if (rc) {
-                CERROR("can't allocate super-sequence, "
-                       "rc %d\n", rc);
-                RETURN(rc);
-        }
+        /* get new range from controller only if super-sequence is not yet
+         * initialized from backing store or something else. */
+        if (range_is_zero(&seq->seq_super)) {
+                /* release sema to avoid deadlock for case we're asking our
+                 * selves. */
+                up(&seq->seq_sem);
+                rc = seq_client_alloc_super(cli);
+                down(&seq->seq_sem);
+                
+                if (rc) {
+                        CERROR("can't allocate super-sequence, "
+                               "rc %d\n", rc);
+                        RETURN(rc);
+                }
 
-        /* take super-seq from client seq mgr */
-        LASSERT(range_is_sane(&cli->seq_range));
+                /* take super-seq from client seq mgr */
+                LASSERT(range_is_sane(&cli->seq_range));
 
-        down(&seq->seq_sem);
-        seq->seq_super = cli->seq_range;
+                seq->seq_super = cli->seq_range;
 
-        /* save init seq to backing store. */
-        rc = seq_server_write_state(seq, ctx);
-        if (rc) {
-                CERROR("can't write sequence state, "
-                       "rc = %d\n", rc);
+                /* save init seq to backing store. */
+                rc = seq_server_write_state(seq, ctx);
+                if (rc) {
+                        CERROR("can't write sequence state, "
+                               "rc = %d\n", rc);
+                }
         }
-
         up(&seq->seq_sem);
-        
         RETURN(rc);
 }
 EXPORT_SYMBOL(seq_server_controller);
index 1e75949..226b734 100644 (file)
@@ -53,6 +53,7 @@ seq_client_rpc(struct lu_client_seq *seq,
                struct lu_range *range,
                __u32 opc)
 {
+        struct obd_export *exp = seq->seq_exp;
         int repsize = sizeof(struct lu_range);
         int rc, reqsize = sizeof(__u32);
         struct ptlrpc_request *req;
@@ -60,9 +61,10 @@ seq_client_rpc(struct lu_client_seq *seq,
         __u32 *op;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(seq->seq_exp), 
-                             LUSTRE_MDS_VERSION, SEQ_QUERY,
-                             1, &reqsize, NULL);
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), 
+                             LUSTRE_MDS_VERSION,
+                              SEQ_QUERY, 1, &reqsize,
+                              NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -71,6 +73,7 @@ seq_client_rpc(struct lu_client_seq *seq,
 
         req->rq_replen = lustre_msg_size(1, &repsize);
         req->rq_request_portal = MDS_SEQ_PORTAL;
+
         rc = ptlrpc_queue_wait(req);
         if (rc)
                 GOTO(out_req, rc);
@@ -90,14 +93,13 @@ out_req:
 }
 
 /* request sequence-controller node to allocate new super-sequence. */
-int
-seq_client_alloc_super(struct lu_client_seq *seq)
+static int
+__seq_client_alloc_super(struct lu_client_seq *seq)
 {
         int rc;
         ENTRY;
 
-        rc = seq_client_rpc(seq, &seq->seq_range,
-                            SEQ_ALLOC_SUPER);
+        rc = seq_client_rpc(seq, &seq->seq_range, SEQ_ALLOC_SUPER);
         if (rc == 0) {
                 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated super-sequence "
                        "["LPX64"-"LPX64"]\n", seq->seq_range.lr_start,
@@ -105,17 +107,29 @@ seq_client_alloc_super(struct lu_client_seq *seq)
         }
         RETURN(rc);
 }
+
+int
+seq_client_alloc_super(struct lu_client_seq *seq)
+{
+        int rc;
+        ENTRY;
+        
+        down(&seq->seq_sem);
+        rc = __seq_client_alloc_super(seq);
+        up(&seq->seq_sem);
+
+        RETURN(rc);
+}
 EXPORT_SYMBOL(seq_client_alloc_super);
 
 /* request sequence-controller node to allocate new meta-sequence. */
-int
-seq_client_alloc_meta(struct lu_client_seq *seq)
+static int
+__seq_client_alloc_meta(struct lu_client_seq *seq)
 {
         int rc;
         ENTRY;
 
-        rc = seq_client_rpc(seq, &seq->seq_range,
-                            SEQ_ALLOC_META);
+        rc = seq_client_rpc(seq, &seq->seq_range, SEQ_ALLOC_META);
         if (rc == 0) {
                 CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated meta-sequence "
                        "["LPX64"-"LPX64"]\n", seq->seq_range.lr_start,
@@ -123,44 +137,62 @@ seq_client_alloc_meta(struct lu_client_seq *seq)
         }
         RETURN(rc);
 }
-EXPORT_SYMBOL(seq_client_alloc_meta);
 
-/* allocate new sequence for client (llite or MDC are expected to use this) */
 int
-seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+seq_client_alloc_meta(struct lu_client_seq *seq)
 {
         int rc;
         ENTRY;
 
         down(&seq->seq_sem);
+        rc = __seq_client_alloc_meta(seq);
+        up(&seq->seq_sem);
+
+        RETURN(rc);
+}
+EXPORT_SYMBOL(seq_client_alloc_meta);
+
+/* allocate new sequence for client (llite or MDC are expected to use this) */
+static int
+__seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+{
+        int rc = 0;
+        ENTRY;
+
         LASSERT(range_is_sane(&seq->seq_range));
 
         /* if we still have free sequences in meta-sequence we allocate new seq
-         * from given range. */
-        if (seq->seq_range.lr_end > seq->seq_range.lr_start) {
-                *seqnr = seq->seq_range.lr_start;
-                seq->seq_range.lr_start += 1;
-                rc = 0;
-        } else {
-                /* meta-sequence is exhausted, request MDT to allocate new
-                 * meta-sequence for us. */
-                rc = seq_client_alloc_meta(seq);
+         * from given range, if not - allocate new meta-sequence. */
+        if (range_space(&seq->seq_range) == 0) {
+                rc = __seq_client_alloc_meta(seq);
                 if (rc) {
                         CERROR("can't allocate new meta-sequence, "
                                "rc %d\n", rc);
                 }
-                
-                *seqnr = seq->seq_range.lr_start;
-                seq->seq_range.lr_start += 1;
         }
-        up(&seq->seq_sem);
-
+        
+        *seqnr = seq->seq_range.lr_start;
+        seq->seq_range.lr_start += 1;
+        
         if (rc == 0) {
-                CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated sequence "
-                       "["LPX64"]\n", *seqnr);
+                CDEBUG(D_INFO|D_WARNING, "SEQ-MGR(cli): allocated "
+                       "sequence ["LPX64"]\n", *seqnr);
         }
         RETURN(rc);
 }
+
+int
+seq_client_alloc_seq(struct lu_client_seq *seq, __u64 *seqnr)
+{
+        int rc = 0;
+        ENTRY;
+
+        down(&seq->seq_sem);
+        rc = __seq_client_alloc_seq(seq, seqnr);
+        up(&seq->seq_sem);
+
+        RETURN(rc);
+}
 EXPORT_SYMBOL(seq_client_alloc_seq);
 
 int
@@ -179,9 +211,7 @@ seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
         {
                 /* allocate new sequence for case client hass no sequence at all
                  * or sequnece is exhausted and should be switched. */
-                up(&seq->seq_sem);
-                rc = seq_client_alloc_seq(seq, &seqnr);
-                down(&seq->seq_sem);
+                rc = __seq_client_alloc_seq(seq, &seqnr);
                 if (rc) {
                         CERROR("can't allocate new sequence, "
                                "rc %d\n", rc);
@@ -225,13 +255,9 @@ seq_client_init(struct lu_client_seq *seq,
         
         seq->seq_flags = flags;
         fid_zero(&seq->seq_fid);
+        range_zero(&seq->seq_range);
         sema_init(&seq->seq_sem, 1);
-        
-        seq->seq_range.lr_end = 0;
-        seq->seq_range.lr_start = 0;
-       
-        if (exp != NULL)
-                seq->seq_exp = class_export_get(exp);
+        seq->seq_exp = class_export_get(exp);
 
         CDEBUG(D_INFO|D_WARNING, "Client Sequence "
                "Manager initialized\n");
index 201841e..2d934b5 100644 (file)
@@ -324,6 +324,7 @@ fld_client_rpc(struct obd_export *exp,
 
         req->rq_replen = lustre_msg_size(1, &mf_size);
         req->rq_request_portal = MDS_FLD_PORTAL;
+
         rc = ptlrpc_queue_wait(req);
         if (rc)
                 GOTO(out_req, rc);
index 4098bc9..e0b5d23 100644 (file)
@@ -67,15 +67,6 @@ enum dt_lock_mode {
  */
 struct dt_device_operations {
         /*
-         * Method for getting/setting device wide back stored config data,
-         * like last used meta-sequence, etc.
-         *
-         * XXX this is ioctl()-like interface we want to get rid of.
-         */
-        int (*dt_config) (const struct lu_context *ctx,
-                          struct dt_device *dev, const char *name,
-                          void *buf, int size, int mode);
-        /*
          * Return device-wide statistics.
          */
         int   (*dt_statfs)(const struct lu_context *ctx,
index a6cef26..69856fe 100644 (file)
@@ -129,6 +129,16 @@ struct lu_range {
         __u64 lr_end;
 };
 
+static inline __u64 range_space(struct lu_range *r)
+{
+        return r->lr_end - r->lr_start;
+}
+
+static inline void range_zero(struct lu_range *r)
+{
+        r->lr_start = r->lr_end = 0;
+}
+
 static inline int range_is_sane(struct lu_range *r)
 {
         if (r->lr_end >= r->lr_start)
@@ -136,6 +146,16 @@ static inline int range_is_sane(struct lu_range *r)
         return 0;
 }
 
+static inline int range_is_zero(struct lu_range *r)
+{
+        return (r->lr_start == 0 && r->lr_end == 0);
+}
+
+static inline int range_is_exhausted(struct lu_range *r)
+{
+        return range_space(r) == 0;
+}
+
 struct lu_fid {
         __u64 f_seq;  /* holds fid sequence. Lustre should support 2 ^ 64
                        * objects, thus even if one sequence has one object we
index 32f2aec..19422de 100644 (file)
@@ -103,12 +103,6 @@ struct md_dir_operations {
 };
 
 struct md_device_operations {
-        /* method for getting/setting device wide back stored config data, like
-         * last used meta-sequence, etc. */
-        int (*mdo_config) (const struct lu_context *ctx,
-                           struct md_device *m, const char *name,
-                           void *buf, int size, int mode);
-
         /* meta-data device related handlers. */
         int (*mdo_root_get)(const struct lu_context *ctx,
                             struct md_device *m, struct lu_fid *f);
index a35bd4d..344dddb 100644 (file)
@@ -1066,13 +1066,24 @@ static int mdc_fid_init(struct obd_export *exp)
         if (cli->cl_seq == NULL)
                 RETURN(-ENOMEM);
 
-        /* init client side of sequence-manager */
+        /* init client side sequence-manager */
         rc = seq_client_init(cli->cl_seq, exp, 0);
+        if (rc)
+                GOTO(out_free_seq, rc);
+
+        /* pre-allocate meta-sequence */
+        rc = seq_client_alloc_meta(cli->cl_seq);
         if (rc) {
-                OBD_FREE_PTR(cli->cl_seq);
-                cli->cl_seq = NULL;
+                CERROR("can't allocate new mata-sequence, "
+                       "rc %d\n", rc);
+                GOTO(out_free_seq, rc);
         }
         RETURN(rc);
+
+out_free_seq:
+        OBD_FREE_PTR(cli->cl_seq);
+        cli->cl_seq = NULL;
+        return rc;
 }
 
 static int mdc_fid_fini(struct obd_export *exp)
index 67dd199..973959e 100644 (file)
@@ -887,18 +887,6 @@ static int mdd_root_get(const struct lu_context *ctx,
         RETURN(0);
 }
 
-static int mdd_config(const struct lu_context *ctx, struct md_device *m,
-                      const char *name, void *buf, int size, int mode)
-{
-        struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
-        int rc;
-        ENTRY;
-
-        rc = mdd_child_ops(mdd)->dt_config(ctx, mdd->mdd_child,
-                                           name, buf, size, mode);
-        RETURN(rc);
-}
-
 static int mdd_statfs(const struct lu_context *ctx,
                       struct md_device *m, struct kstatfs *sfs) {
        struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
@@ -973,7 +961,6 @@ static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj)
 
 struct md_device_operations mdd_ops = {
         .mdo_root_get       = mdd_root_get,
-        .mdo_config         = mdd_config,
         .mdo_statfs         = mdd_statfs,
 };
 
index 9290c47..b6520fd 100644 (file)
@@ -296,19 +296,6 @@ static int osd_object_print(const struct lu_context *ctx,
                           o->oo_inode ? o->oo_inode->i_generation : 0);
 }
 
-static int osd_config(const struct lu_context *ctx,
-                      struct dt_device *d, const char *name,
-                      void *buf, int size, int mode)
-{
-        if (mode == LUSTRE_CONFIG_GET) {
-                /* to be continued */
-                return 0;
-        } else {
-                /* to be continued */
-                return 0;
-        }
-}
-
 static int osd_statfs(const struct lu_context *ctx,
                       struct dt_device *d, struct kstatfs *sfs)
 {
@@ -406,7 +393,6 @@ static void osd_trans_stop(const struct lu_context *ctx, struct thandle *th)
 
 static struct dt_device_operations osd_dt_ops = {
         .dt_root_get    = osd_root_get,
-        .dt_config      = osd_config,
         .dt_statfs      = osd_statfs,
         .dt_trans_start = osd_trans_start,
         .dt_trans_stop  = osd_trans_stop,