Whamcloud - gitweb
b=15957
authorpravins <pravins>
Fri, 28 Nov 2008 09:17:15 +0000 (09:17 +0000)
committerpravins <pravins>
Fri, 28 Nov 2008 09:17:15 +0000 (09:17 +0000)
i=Nikita
i=umka

compact FLD feature.

30 files changed:
lustre/ChangeLog
lustre/cmm/cmm_device.c
lustre/cmm/cmm_object.c
lustre/cmm/cmm_split.c
lustre/fid/fid_handler.c
lustre/fid/fid_internal.h
lustre/fid/fid_lib.c
lustre/fid/fid_request.c
lustre/fid/fid_store.c
lustre/fid/lproc_fid.c
lustre/fld/fld_cache.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/fld/fld_request.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fid.h
lustre/include/lustre_fld.h
lustre/lmv/lmv_fld.c
lustre/lmv/lmv_intent.c
lustre/lmv/lmv_obd.c
lustre/mdt/mdt_handler.c
lustre/obdclass/llog_swab.c
lustre/osd/osd_handler.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/ptlrpc_module.c
lustre/tests/test-framework.sh
lustre/utils/req-layout.c
lustre/utils/wiretest.c

index 3fb9d7d..3df013c 100644 (file)
@@ -12,7 +12,15 @@ tbd  Sun Microsystems, Inc.
        * RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a
         removed cwd "./" (refer to Bugzilla 14399).
        * File join has been disabled in this release, refer to Bugzilla 16929.
        * RHEL 4 and RHEL 5/SLES 10 clients behaves differently on 'cd' to a
         removed cwd "./" (refer to Bugzilla 14399).
        * File join has been disabled in this release, refer to Bugzilla 16929.
+
+Severity   : enhancement
+Bugzilla   : 15957
+Description: compact fld format with extents
+Details    : Store range of seq rather than every seq in FLD. Seq
+             controller update FLD rather than clients. In Case of CMD, mdt0
+             has FLD, all other metadata server act as non persistent proxy
+             for FLD queries and cache fld entries in fld cache.
+
 Severity   : normal
 Frequency  : rare
 Bugzilla   : 16081
 Severity   : normal
 Frequency  : rare
 Bugzilla   : 16081
index a74fdf4..08a435e 100644 (file)
@@ -447,6 +447,7 @@ static int cmm_add_mdc(const struct lu_env *env,
         struct lu_device *ld;
         struct lu_device *cmm_lu = cmm2lu_dev(cm);
         mdsno_t mdc_num;
         struct lu_device *ld;
         struct lu_device *cmm_lu = cmm2lu_dev(cm);
         mdsno_t mdc_num;
+        struct lu_site *site = cmm2lu_dev(cm)->ld_site;
         int rc;
         ENTRY;
 
         int rc;
         ENTRY;
 
@@ -471,7 +472,7 @@ static int cmm_add_mdc(const struct lu_env *env,
         if (IS_ERR(ld))
                 RETURN(PTR_ERR(ld));
 
         if (IS_ERR(ld))
                 RETURN(PTR_ERR(ld));
 
-        ld->ld_site = cmm2lu_dev(cm)->ld_site;
+        ld->ld_site = site;
 
         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
         if (rc) {
 
         rc = ldt->ldt_ops->ldto_device_init(env, ld, NULL, NULL);
         if (rc) {
@@ -509,6 +510,13 @@ static int cmm_add_mdc(const struct lu_env *env,
         target.ft_exp = mc->mc_desc.cl_exp;
         fld_client_add_target(cm->cmm_fld, &target);
 
         target.ft_exp = mc->mc_desc.cl_exp;
         fld_client_add_target(cm->cmm_fld, &target);
 
+        if (mc->mc_num == 0) {
+                /* this is mdt0 -> mc export, fld lookup need this export
+                   to forward fld lookup request. */
+                LASSERT(!lu_site2md(site)->ms_server_fld->lsf_control_exp);
+                lu_site2md(site)->ms_server_fld->lsf_control_exp =
+                                          mc->mc_desc.cl_exp;
+        }
         /* Set max md size for the mdc. */
         rc = cmm_post_init_mdc(env, cm);
         RETURN(rc);
         /* Set max md size for the mdc. */
         rc = cmm_post_init_mdc(env, cm);
         RETURN(rc);
index 73c9dad..7cbf87d 100644 (file)
@@ -66,12 +66,12 @@ int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
         }
 
         if (*mds > cm->cmm_tgt_count) {
         }
 
         if (*mds > cm->cmm_tgt_count) {
-                CERROR("Got invalid mdsno: "LPU64" (max: %u)\n",
+                CERROR("Got invalid mdsno: %x (max: %x)\n",
                        *mds, cm->cmm_tgt_count);
                 rc = -EINVAL;
         } else {
                        *mds, cm->cmm_tgt_count);
                 rc = -EINVAL;
         } else {
-                CDEBUG(D_INFO, "CMM: got MDS "LPU64" for sequence: "
-                       LPU64"\n", *mds, fid_seq(fid));
+                CDEBUG(D_INFO, "CMM: got MDS %x for sequence: "
+                       LPX64"\n", *mds, fid_seq(fid));
         }
 
         RETURN (rc);
         }
 
         RETURN (rc);
index 361b38d..8cb4cd9 100644 (file)
@@ -268,13 +268,8 @@ static int cmm_split_fid_alloc(const struct lu_env *env,
 
         /* Alloc new fid on @mc. */
         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
 
         /* Alloc new fid on @mc. */
         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
-        if (rc > 0) {
-                /* Setup FLD for new sequenceif needed. */
-                rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
-                                       mc->mc_num, env);
-                if (rc)
-                        CERROR("Can't create fld entry, rc %d\n", rc);
-        }
+        if (rc > 0)
+                rc = 0;
         up(&mc->mc_fid_sem);
 
         RETURN(rc);
         up(&mc->mc_fid_sem);
 
         RETURN(rc);
index 9e3d7ad..5af9343 100644 (file)
@@ -93,6 +93,7 @@ int seq_server_set_cli(struct lu_server_seq *seq,
                seq->lss_name, cli->lcs_name);
 
         seq->lss_cli = cli;
                seq->lss_name, cli->lcs_name);
 
         seq->lss_cli = cli;
+        cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
         EXIT;
 out_up:
         up(&seq->lss_sem);
         EXIT;
 out_up:
         up(&seq->lss_sem);
@@ -100,16 +101,22 @@ out_up:
 }
 EXPORT_SYMBOL(seq_server_set_cli);
 
 }
 EXPORT_SYMBOL(seq_server_set_cli);
 
-/*
+/**
  * On controller node, allocate new super sequence for regular sequence server.
  * On controller node, allocate new super sequence for regular sequence server.
+ * As this super sequence controller, this node suppose to maintain fld
+ * and update index.
+ * \a out range always has currect mds node number of requester.
  */
  */
+
 static int __seq_server_alloc_super(struct lu_server_seq *seq,
 static int __seq_server_alloc_super(struct lu_server_seq *seq,
-                                    struct lu_range *in,
-                                    struct lu_range *out,
+                                    struct lu_seq_range *in,
+                                    struct lu_seq_range *out,
                                     const struct lu_env *env)
 {
                                     const struct lu_env *env)
 {
-        struct lu_range *space = &seq->lss_space;
-        int rc;
+        struct lu_seq_range *space = &seq->lss_space;
+        struct thandle *th;
+        __u64 mdt = out->lsr_mdt;
+        int rc, credit;
         ENTRY;
 
         LASSERT(range_is_sane(space));
         ENTRY;
 
         LASSERT(range_is_sane(space));
@@ -118,8 +125,8 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
                 CDEBUG(D_INFO, "%s: Input seq range: "
                        DRANGE"\n", seq->lss_name, PRANGE(in));
 
                 CDEBUG(D_INFO, "%s: Input seq range: "
                        DRANGE"\n", seq->lss_name, PRANGE(in));
 
-                if (in->lr_end > space->lr_start)
-                        space->lr_start = in->lr_end;
+                if (in->lsr_end > space->lsr_start)
+                        space->lsr_start = in->lsr_end;
                 *out = *in;
 
                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
                 *out = *in;
 
                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
@@ -130,7 +137,7 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
                               "Only "LPU64" sequences left\n", seq->lss_name,
                               range_space(space));
                         *out = *space;
                               "Only "LPU64" sequences left\n", seq->lss_name,
                               range_space(space));
                         *out = *space;
-                        space->lr_start = space->lr_end;
+                        space->lsr_start = space->lsr_end;
                 } else if (range_is_exhausted(space)) {
                         CERROR("%s: Sequences space is exhausted\n",
                                seq->lss_name);
                 } else if (range_is_exhausted(space)) {
                         CERROR("%s: Sequences space is exhausted\n",
                                seq->lss_name);
@@ -139,23 +146,40 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
                         range_alloc(out, space, seq->lss_width);
                 }
         }
                         range_alloc(out, space, seq->lss_width);
                 }
         }
+        out->lsr_mdt = mdt;
+
+        credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS;
+
+        th = seq_store_trans_start(seq, env, credit);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
 
 
-        rc = seq_store_write(seq, env);
+        rc = seq_store_write(seq, env, th);
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                        seq->lss_name, rc);
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                        seq->lss_name, rc);
-                RETURN(rc);
+                goto out;
         }
 
         }
 
-        CDEBUG(D_INFO, "%s: Allocated super-sequence "
-               DRANGE"\n", seq->lss_name, PRANGE(out));
+        rc = fld_server_create(seq->lss_site->ms_server_fld,
+                               env, out, th);
+        if (rc) {
+                CERROR("%s: Can't Update fld database, rc %d\n",
+                       seq->lss_name, rc);
+        }
+
+out:
+        seq_store_trans_stop(seq, env, th);
+
+        CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
+               DRANGE"\n", seq->lss_name, rc, PRANGE(out));
 
         RETURN(rc);
 }
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
 
         RETURN(rc);
 }
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
-                           struct lu_range *in,
-                           struct lu_range *out,
+                           struct lu_seq_range *in,
+                           struct lu_seq_range *out,
                            const struct lu_env *env)
 {
         int rc;
                            const struct lu_env *env)
 {
         int rc;
@@ -169,12 +193,14 @@ int seq_server_alloc_super(struct lu_server_seq *seq,
 }
 
 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
 }
 
 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
-                                   struct lu_range *in,
-                                   struct lu_range *out,
+                                   struct lu_seq_range *in,
+                                   struct lu_seq_range *out,
                                    const struct lu_env *env)
 {
                                    const struct lu_env *env)
 {
-        struct lu_range *space = &seq->lss_space;
+        struct lu_seq_range *space = &seq->lss_space;
+        struct thandle *th;
         int rc = 0;
         int rc = 0;
+
         ENTRY;
 
         LASSERT(range_is_sane(space));
         ENTRY;
 
         LASSERT(range_is_sane(space));
@@ -193,22 +219,22 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                          * we check here that range from client is "newer" than
                          * exhausted super.
                          */
                          * we check here that range from client is "newer" than
                          * exhausted super.
                          */
-                        LASSERT(in->lr_end > space->lr_start);
+                        LASSERT(in->lsr_end > space->lsr_start);
 
                         /*
                          * Start is set to end of last allocated, because it
                          * *is* already allocated so we take that into account
                          * and do not use for other allocations.
                          */
 
                         /*
                          * Start is set to end of last allocated, because it
                          * *is* already allocated so we take that into account
                          * and do not use for other allocations.
                          */
-                        space->lr_start = in->lr_end;
+                        space->lsr_start = in->lsr_end;
 
                         /*
 
                         /*
-                         * End is set to in->lr_start + super sequence
-                         * allocation unit. That is because in->lr_start is
+                         * End is set to in->lsr_start + super sequence
+                         * allocation unit. That is because in->lsr_start is
                          * first seq in new allocated range from controller
                          * before failure.
                          */
                          * first seq in new allocated range from controller
                          * before failure.
                          */
-                        space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
+                        space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH;
 
                         if (!seq->lss_cli) {
                                 CERROR("%s: No sequence controller "
 
                         if (!seq->lss_cli) {
                                 CERROR("%s: No sequence controller "
@@ -221,6 +247,7 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                          * obtained range from it was @space.
                          */
                         rc = seq_client_replay_super(seq->lss_cli, space, env);
                          * obtained range from it was @space.
                          */
                         rc = seq_client_replay_super(seq->lss_cli, space, env);
+
                         if (rc) {
                                 CERROR("%s: Can't replay super-sequence, "
                                        "rc %d\n", seq->lss_name, rc);
                         if (rc) {
                                 CERROR("%s: Can't replay super-sequence, "
                                        "rc %d\n", seq->lss_name, rc);
@@ -231,8 +258,8 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                          * Update super start by end from client's range. Super
                          * end should not be changed if range was not exhausted.
                          */
                          * Update super start by end from client's range. Super
                          * end should not be changed if range was not exhausted.
                          */
-                        if (in->lr_end > space->lr_start)
-                                space->lr_start = in->lr_end;
+                        if (in->lsr_end > space->lsr_start)
+                                space->lsr_start = in->lsr_end;
                 }
 
                 *out = *in;
                 }
 
                 *out = *in;
@@ -266,7 +293,11 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                 range_alloc(out, space, seq->lss_width);
         }
 
                 range_alloc(out, space, seq->lss_width);
         }
 
-        rc = seq_store_write(seq, env);
+        th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
+
+        rc = seq_store_write(seq, env, th);
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                       seq->lss_name, rc);
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                       seq->lss_name, rc);
@@ -277,12 +308,13 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                        DRANGE"\n", seq->lss_name, PRANGE(out));
         }
 
                        DRANGE"\n", seq->lss_name, PRANGE(out));
         }
 
+        seq_store_trans_stop(seq, env, th);
         RETURN(rc);
 }
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
         RETURN(rc);
 }
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
-                          struct lu_range *in,
-                          struct lu_range *out,
+                          struct lu_seq_range *in,
+                          struct lu_seq_range *out,
                           const struct lu_env *env)
 {
         int rc;
                           const struct lu_env *env)
 {
         int rc;
@@ -298,8 +330,8 @@ EXPORT_SYMBOL(seq_server_alloc_meta);
 
 static int seq_server_handle(struct lu_site *site,
                              const struct lu_env *env,
 
 static int seq_server_handle(struct lu_site *site,
                              const struct lu_env *env,
-                             __u32 opc, struct lu_range *in,
-                             struct lu_range *out)
+                             __u32 opc, struct lu_seq_range *in,
+                             struct lu_seq_range *out)
 {
         int rc;
         struct md_site *mite;
 {
         int rc;
         struct md_site *mite;
@@ -337,7 +369,7 @@ static int seq_req_handle(struct ptlrpc_request *req,
                           const struct lu_env *env,
                           struct seq_thread_info *info)
 {
                           const struct lu_env *env,
                           struct seq_thread_info *info)
 {
-        struct lu_range *out, *in = NULL;
+        struct lu_seq_range *out, *in = NULL, *tmp;
         struct lu_site *site;
         int rc = -EPROTO;
         __u32 *opc;
         struct lu_site *site;
         int rc = -EPROTO;
         __u32 *opc;
@@ -356,13 +388,16 @@ static int seq_req_handle(struct ptlrpc_request *req,
                 if (out == NULL)
                         RETURN(err_serious(-EPROTO));
 
                 if (out == NULL)
                         RETURN(err_serious(-EPROTO));
 
-                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
-                        in = req_capsule_client_get(info->sti_pill,
-                                                    &RMF_SEQ_RANGE);
+                tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
 
 
+                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                        in = tmp;
                         LASSERT(!range_is_zero(in) && range_is_sane(in));
                 }
                         LASSERT(!range_is_zero(in) && range_is_sane(in));
                 }
+                /* seq client passed mdt id, we need to pass that using out
+                 * range parameter */
 
 
+                out->lsr_mdt = tmp->lsr_mdt;
                 rc = seq_server_handle(site, env, *opc, in, out);
         } else
                 rc = err_serious(-EPROTO);
                 rc = seq_server_handle(site, env, *opc, in, out);
         } else
                 rc = err_serious(-EPROTO);
@@ -475,8 +510,10 @@ int seq_server_init(struct lu_server_seq *seq,
                     struct dt_device *dev,
                     const char *prefix,
                     enum lu_mgr_type type,
                     struct dt_device *dev,
                     const char *prefix,
                     enum lu_mgr_type type,
+                    struct md_site *ms,
                     const struct lu_env *env)
 {
                     const struct lu_env *env)
 {
+        struct thandle *th;
         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
         ENTRY;
 
         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
         ENTRY;
 
@@ -485,6 +522,7 @@ int seq_server_init(struct lu_server_seq *seq,
 
         seq->lss_cli = NULL;
         seq->lss_type = type;
 
         seq->lss_cli = NULL;
         seq->lss_type = type;
+        seq->lss_site = ms;
         range_init(&seq->lss_space);
         sema_init(&seq->lss_sem, 1);
 
         range_init(&seq->lss_space);
         sema_init(&seq->lss_sem, 1);
 
@@ -497,7 +535,6 @@ int seq_server_init(struct lu_server_seq *seq,
         rc = seq_store_init(seq, env, dev);
         if (rc)
                 GOTO(out, rc);
         rc = seq_store_init(seq, env, dev);
         if (rc)
                 GOTO(out, rc);
-
         /* Request backing store for saved sequence info. */
         rc = seq_store_read(seq, env);
         if (rc == -ENODATA) {
         /* Request backing store for saved sequence info. */
         rc = seq_store_read(seq, env);
         if (rc == -ENODATA) {
@@ -507,16 +544,22 @@ int seq_server_init(struct lu_server_seq *seq,
                         LUSTRE_SEQ_ZERO_RANGE:
                         LUSTRE_SEQ_SPACE_RANGE;
 
                         LUSTRE_SEQ_ZERO_RANGE:
                         LUSTRE_SEQ_SPACE_RANGE;
 
+                seq->lss_space.lsr_mdt = ms->ms_node_id;
                 CDEBUG(D_INFO, "%s: No data found "
                        "on store. Initialize space\n",
                        seq->lss_name);
 
                 CDEBUG(D_INFO, "%s: No data found "
                        "on store. Initialize space\n",
                        seq->lss_name);
 
+                th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+                if (IS_ERR(th))
+                        RETURN(PTR_ERR(th));
+
                 /* Save default controller value to store. */
                 /* Save default controller value to store. */
-                rc = seq_store_write(seq, env);
+                rc = seq_store_write(seq, env, th);
                 if (rc) {
                         CERROR("%s: Can't write space data, "
                                "rc %d\n", seq->lss_name, rc);
                 }
                 if (rc) {
                         CERROR("%s: Can't write space data, "
                                "rc %d\n", seq->lss_name, rc);
                 }
+                seq_store_trans_stop(seq, env, th);
         } else if (rc) {
                CERROR("%s: Can't read space data, rc %d\n",
                       seq->lss_name, rc);
         } else if (rc) {
                CERROR("%s: Can't read space data, rc %d\n",
                       seq->lss_name, rc);
index 9137656..03c5227 100644 (file)
 struct seq_thread_info {
         struct req_capsule     *sti_pill;
         struct txn_param        sti_txn;
 struct seq_thread_info {
         struct req_capsule     *sti_pill;
         struct txn_param        sti_txn;
-        struct lu_range         sti_space;
+        struct lu_seq_range     sti_space;
         struct lu_buf           sti_buf;
 };
 
         struct lu_buf           sti_buf;
 };
 
+enum {
+        SEQ_TXN_STORE_CREDITS = 20
+};
+
 extern struct lu_context_key seq_thread_key;
 
 /* Functions used internally in module. */
 extern struct lu_context_key seq_thread_key;
 
 /* Functions used internally in module. */
@@ -60,7 +64,7 @@ int seq_client_alloc_super(struct lu_client_seq *seq,
                            const struct lu_env *env);
 
 int seq_client_replay_super(struct lu_client_seq *seq,
                            const struct lu_env *env);
 
 int seq_client_replay_super(struct lu_client_seq *seq,
-                            struct lu_range *range,
+                            struct lu_seq_range *range,
                             const struct lu_env *env);
 
 /* Store API functions. */
                             const struct lu_env *env);
 
 /* Store API functions. */
@@ -72,11 +76,19 @@ void seq_store_fini(struct lu_server_seq *seq,
                     const struct lu_env *env);
 
 int seq_store_write(struct lu_server_seq *seq,
                     const struct lu_env *env);
 
 int seq_store_write(struct lu_server_seq *seq,
-                    const struct lu_env *env);
+                    const struct lu_env *env,
+                    struct thandle *th);
 
 int seq_store_read(struct lu_server_seq *seq,
                    const struct lu_env *env);
 
 
 int seq_store_read(struct lu_server_seq *seq,
                    const struct lu_env *env);
 
+struct thandle * seq_store_trans_start(struct lu_server_seq *seq,
+                                       const struct lu_env *env,
+                                       int credits);
+void seq_store_trans_stop(struct lu_server_seq *seq,
+                          const struct lu_env *env,
+                          struct thandle *th);
+
 #ifdef LPROCFS
 extern struct lprocfs_vars seq_server_proc_list[];
 extern struct lprocfs_vars seq_client_proc_list[];
 #ifdef LPROCFS
 extern struct lprocfs_vars seq_server_proc_list[];
 extern struct lprocfs_vars seq_client_proc_list[];
index 694ee78..76e779a 100644 (file)
  * The first 0x400 sequences of normal FID are reserved for special purpose.
  * FID_SEQ_START + 1 is for local file id generation.
  */
  * The first 0x400 sequences of normal FID are reserved for special purpose.
  * FID_SEQ_START + 1 is for local file id generation.
  */
-const struct lu_range LUSTRE_SEQ_SPACE_RANGE = {
+const struct lu_seq_range LUSTRE_SEQ_SPACE_RANGE = {
         FID_SEQ_START + 0x400ULL,
         (__u64)~0ULL
 };
 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
 
 /* Zero range, used for init and other purposes. */
         FID_SEQ_START + 0x400ULL,
         (__u64)~0ULL
 };
 EXPORT_SYMBOL(LUSTRE_SEQ_SPACE_RANGE);
 
 /* Zero range, used for init and other purposes. */
-const struct lu_range LUSTRE_SEQ_ZERO_RANGE = {
+const struct lu_seq_range LUSTRE_SEQ_ZERO_RANGE = {
         0,
         0
 };
         0,
         0
 };
@@ -89,54 +89,3 @@ const struct lu_fid LUSTRE_BFL_FID = { .f_seq = 0x0000000000000003,
                                        .f_oid = 0x0000000000000001,
                                        .f_ver = 0x0000000000000000 };
 EXPORT_SYMBOL(LUSTRE_BFL_FID);
                                        .f_oid = 0x0000000000000001,
                                        .f_ver = 0x0000000000000000 };
 EXPORT_SYMBOL(LUSTRE_BFL_FID);
-
-void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src)
-{
-        /* check that all fields are converted */
-        CLASSERT(sizeof(*src) ==
-                 sizeof(src->lr_start) +
-                 sizeof(src->lr_end) +
-                 sizeof(src->lr_padding));
-        dst->lr_start = cpu_to_le64(src->lr_start);
-        dst->lr_end = cpu_to_le64(src->lr_end);
-}
-EXPORT_SYMBOL(range_cpu_to_le);
-
-void range_le_to_cpu(struct lu_range *dst, const struct lu_range *src)
-{
-        /* check that all fields are converted */
-        CLASSERT(sizeof(*src) ==
-                 sizeof(src->lr_start) +
-                 sizeof(src->lr_end) +
-                 sizeof(src->lr_padding));
-        dst->lr_start = le64_to_cpu(src->lr_start);
-        dst->lr_end = le64_to_cpu(src->lr_end);
-}
-EXPORT_SYMBOL(range_le_to_cpu);
-
-#ifdef __KERNEL__
-void range_cpu_to_be(struct lu_range *dst, const struct lu_range *src)
-{
-        /* check that all fields are converted */
-        CLASSERT(sizeof(*src) ==
-                 sizeof(src->lr_start) +
-                 sizeof(src->lr_end) +
-                 sizeof(src->lr_padding));
-        dst->lr_start = cpu_to_be64(src->lr_start);
-        dst->lr_end = cpu_to_be64(src->lr_end);
-}
-EXPORT_SYMBOL(range_cpu_to_be);
-
-void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src)
-{
-        /* check that all fields are converted */
-        CLASSERT(sizeof(*src) ==
-                 sizeof(src->lr_start) +
-                 sizeof(src->lr_end) +
-                 sizeof(src->lr_padding));
-        dst->lr_start = be64_to_cpu(src->lr_start);
-        dst->lr_end = be64_to_cpu(src->lr_end);
-}
-EXPORT_SYMBOL(range_be_to_cpu);
-
-#endif
index c6c3881..9939c82 100644 (file)
 #include <lustre_mdc.h>
 #include "fid_internal.h"
 
 #include <lustre_mdc.h>
 #include "fid_internal.h"
 
-static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
-                          struct lu_range *output, __u32 opc,
+static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *input,
+                          struct lu_seq_range *output, __u32 opc,
                           const char *opcname)
 {
         struct obd_export     *exp = seq->lcs_exp;
         struct ptlrpc_request *req;
                           const char *opcname)
 {
         struct obd_export     *exp = seq->lcs_exp;
         struct ptlrpc_request *req;
-        struct lu_range       *out, *in;
+        struct lu_seq_range       *out, *in;
         __u32                 *op;
         int                    rc;
         ENTRY;
         __u32                 *op;
         int                    rc;
         ENTRY;
@@ -95,9 +95,13 @@ static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
                         SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
                         SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
+                /* update mdt field of *in, it is required for fld update
+                 * on super sequence allocator node. */
+                if (opc == SEQ_ALLOC_SUPER)
+                        in->lsr_mdt = seq->lcs_space.lsr_mdt;
         } else {
         } else {
-                req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
-                        SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
+                LASSERT(opc == SEQ_ALLOC_META);
+                req->rq_request_portal = SEQ_DATA_PORTAL;
         }
         ptlrpc_at_set_req_timeout(req);
 
         }
         ptlrpc_at_set_req_timeout(req);
 
@@ -135,7 +139,7 @@ out_req:
 
 /* Request sequence-controller node to allocate new super-sequence. */
 int seq_client_replay_super(struct lu_client_seq *seq,
 
 /* Request sequence-controller node to allocate new super-sequence. */
 int seq_client_replay_super(struct lu_client_seq *seq,
-                            struct lu_range *range,
+                            struct lu_seq_range *range,
                             const struct lu_env *env)
 {
         int rc;
                             const struct lu_env *env)
 {
         int rc;
@@ -212,8 +216,8 @@ static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
         }
 
         LASSERT(!range_is_exhausted(&seq->lcs_space));
         }
 
         LASSERT(!range_is_exhausted(&seq->lcs_space));
-        *seqnr = seq->lcs_space.lr_start;
-        seq->lcs_space.lr_start += 1;
+        *seqnr = seq->lcs_space.lsr_start;
+        seq->lcs_space.lsr_start += 1;
 
         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
                *seqnr);
 
         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
                *seqnr);
@@ -280,6 +284,13 @@ void seq_client_flush(struct lu_client_seq *seq)
         LASSERT(seq != NULL);
         down(&seq->lcs_sem);
         fid_zero(&seq->lcs_fid);
         LASSERT(seq != NULL);
         down(&seq->lcs_sem);
         fid_zero(&seq->lcs_fid);
+        /**
+         * this id shld not be used for seq range allocation.
+         * set to -1 for dgb check.
+         */
+
+        seq->lcs_space.lsr_mdt = -1;
+
         range_init(&seq->lcs_space);
         up(&seq->lcs_sem);
 }
         range_init(&seq->lcs_space);
         up(&seq->lcs_sem);
 }
index de4bec3..56b950c 100644 (file)
@@ -62,9 +62,6 @@
 #include "fid_internal.h"
 
 #ifdef __KERNEL__
 #include "fid_internal.h"
 
 #ifdef __KERNEL__
-enum {
-        SEQ_TXN_STORE_CREDITS = 20
-};
 
 static struct lu_buf *seq_store_buf(struct seq_thread_info *info)
 {
 
 static struct lu_buf *seq_store_buf(struct seq_thread_info *info)
 {
@@ -76,47 +73,68 @@ static struct lu_buf *seq_store_buf(struct seq_thread_info *info)
         return buf;
 }
 
         return buf;
 }
 
+struct thandle * seq_store_trans_start(struct lu_server_seq *seq,
+                                       const struct lu_env *env, int credit)
+{
+        struct seq_thread_info *info;
+        struct dt_device *dt_dev;
+        struct thandle *th;
+        ENTRY;
+
+        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+        info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
+        LASSERT(info != NULL);
+
+        txn_param_init(&info->sti_txn, credit);
+
+        th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn);
+        return th;
+}
+
+void seq_store_trans_stop(struct lu_server_seq *seq,
+                          const struct lu_env *env,
+                          struct thandle *th)
+{
+        struct dt_device *dt_dev;
+        ENTRY;
+
+        dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
+
+        dt_dev->dd_ops->dt_trans_stop(env, th);
+}
+
 /* This function implies that caller takes care about locking. */
 int seq_store_write(struct lu_server_seq *seq,
 /* This function implies that caller takes care about locking. */
 int seq_store_write(struct lu_server_seq *seq,
-                    const struct lu_env *env)
+                    const struct lu_env *env,
+                    struct thandle *th)
 {
         struct dt_object *dt_obj = seq->lss_obj;
         struct seq_thread_info *info;
         struct dt_device *dt_dev;
 {
         struct dt_object *dt_obj = seq->lss_obj;
         struct seq_thread_info *info;
         struct dt_device *dt_dev;
-        struct thandle *th;
         loff_t pos = 0;
         loff_t pos = 0;
-       int rc;
-       ENTRY;
+        int rc;
+        ENTRY;
 
         dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
         LASSERT(info != NULL);
 
 
         dt_dev = lu2dt_dev(seq->lss_obj->do_lu.lo_dev);
         info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
         LASSERT(info != NULL);
 
-        /* Stub here, will fix it later. */
-        txn_param_init(&info->sti_txn, SEQ_TXN_STORE_CREDITS);
+        /* Store ranges in le format. */
+        range_cpu_to_le(&info->sti_space, &seq->lss_space);
 
 
-        th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &info->sti_txn);
-        if (!IS_ERR(th)) {
-                /* Store ranges in le format. */
-                range_cpu_to_le(&info->sti_space, &seq->lss_space);
-
-                rc = dt_obj->do_body_ops->dbo_write(env, dt_obj,
-                                                    seq_store_buf(info),
-                                                    &pos, th, BYPASS_CAPA, 1);
-                if (rc == sizeof(info->sti_space)) {
-                        CDEBUG(D_INFO, "%s: Space - "DRANGE"\n",
-                               seq->lss_name, PRANGE(&seq->lss_space));
-                        rc = 0;
-                } else if (rc >= 0) {
-                        rc = -EIO;
-                }
-
-                dt_dev->dd_ops->dt_trans_stop(env, th);
-        } else {
-                rc = PTR_ERR(th);
+        rc = dt_obj->do_body_ops->dbo_write(env, dt_obj,
+                                            seq_store_buf(info),
+                                            &pos, th, BYPASS_CAPA, 1);
+        if (rc == sizeof(info->sti_space)) {
+                CDEBUG(D_INFO, "%s: Space - "DRANGE"\n",
+                       seq->lss_name, PRANGE(&seq->lss_space));
+                rc = 0;
+        } else if (rc >= 0) {
+                rc = -EIO;
         }
         }
-       
-       RETURN(rc);
+
+
+        RETURN(rc);
 }
 
 /*
 }
 
 /*
index 63fbacd..e9976f9 100644 (file)
 static int
 seq_proc_write_common(struct file *file, const char *buffer,
                       unsigned long count, void *data,
 static int
 seq_proc_write_common(struct file *file, const char *buffer,
                       unsigned long count, void *data,
-                      struct lu_range *range)
+                      struct lu_seq_range *range)
 {
 {
-       struct lu_range tmp;
+       struct lu_seq_range tmp;
        int rc;
        ENTRY;
 
        LASSERT(range != NULL);
 
        int rc;
        ENTRY;
 
        LASSERT(range != NULL);
 
-        rc = sscanf(buffer, "[%Lx - %Lx]\n",(long long unsigned *)&tmp.lr_start,
-                    (long long unsigned *)&tmp.lr_end);
+        rc = sscanf(buffer, "[%Lx - %Lx]\n",(long long unsigned *)&tmp.lsr_start,
+                    (long long unsigned *)&tmp.lsr_end);
        if (rc != 2 || !range_is_sane(&tmp) || range_is_zero(&tmp))
                RETURN(-EINVAL);
        *range = tmp;
        if (rc != 2 || !range_is_sane(&tmp) || range_is_zero(&tmp))
                RETURN(-EINVAL);
        *range = tmp;
@@ -88,13 +88,13 @@ seq_proc_write_common(struct file *file, const char *buffer,
 static int
 seq_proc_read_common(char *page, char **start, off_t off,
                      int count, int *eof, void *data,
 static int
 seq_proc_read_common(char *page, char **start, off_t off,
                      int count, int *eof, void *data,
-                     struct lu_range *range)
+                     struct lu_seq_range *range)
 {
        int rc;
        ENTRY;
 
         *eof = 1;
 {
        int rc;
        ENTRY;
 
         *eof = 1;
-        rc = snprintf(page, count, "["LPX64" - "LPX64"]\n",
+        rc = snprintf(page, count, "["LPX64" - "LPX64"]:%x\n",
                       PRANGE(range));
        RETURN(rc);
 }
                       PRANGE(range));
        RETURN(rc);
 }
index 9ec1f1a..695fc21 100644 (file)
@@ -37,6 +37,7 @@
  *
  * FLD (Fids Location Database)
  *
  *
  * FLD (Fids Location Database)
  *
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
  * Author: Yury Umanets <umka@clusterfs.com>
  */
 
  * Author: Yury Umanets <umka@clusterfs.com>
  */
 
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
-#ifdef __KERNEL__
-static inline __u32 fld_cache_hash(seqno_t seq)
-{
-        return (__u32)seq;
-}
-
-void fld_cache_flush(struct fld_cache *cache)
-{
-        struct fld_cache_entry *flde;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        struct hlist_node *next;
-        int i;
-        ENTRY;
-
-       /* Free all cache entries. */
-       spin_lock(&cache->fci_lock);
-       for (i = 0; i < cache->fci_hash_size; i++) {
-               bucket = cache->fci_hash_table + i;
-               hlist_for_each_entry_safe(flde, scan, next, bucket, fce_list) {
-                       hlist_del_init(&flde->fce_list);
-                        list_del_init(&flde->fce_lru);
-                        cache->fci_cache_count--;
-                       OBD_FREE_PTR(flde);
-               }
-       }
-        spin_unlock(&cache->fci_lock);
-        EXIT;
-}
-
-struct fld_cache *fld_cache_init(const char *name, int hash_size,
+/**
+ * create fld cache.
+ */
+struct fld_cache *fld_cache_init(const char *name,
                                  int cache_size, int cache_threshold)
 {
                                  int cache_size, int cache_threshold)
 {
-       struct fld_cache *cache;
-        int i;
+        struct fld_cache *cache;
         ENTRY;
 
         LASSERT(name != NULL);
         ENTRY;
 
         LASSERT(name != NULL);
-        LASSERT(IS_PO2(hash_size));
         LASSERT(cache_threshold < cache_size);
 
         OBD_ALLOC_PTR(cache);
         if (cache == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
 
         LASSERT(cache_threshold < cache_size);
 
         OBD_ALLOC_PTR(cache);
         if (cache == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
 
-        INIT_LIST_HEAD(&cache->fci_lru);
+        CFS_INIT_LIST_HEAD(&cache->fci_entries_head);
+        CFS_INIT_LIST_HEAD(&cache->fci_lru);
 
 
-       cache->fci_cache_count = 0;
+        cache->fci_cache_count = 0;
         spin_lock_init(&cache->fci_lock);
 
         strncpy(cache->fci_name, name,
                 sizeof(cache->fci_name));
 
         spin_lock_init(&cache->fci_lock);
 
         strncpy(cache->fci_name, name,
                 sizeof(cache->fci_name));
 
-       cache->fci_hash_size = hash_size;
-       cache->fci_cache_size = cache_size;
+        cache->fci_cache_size = cache_size;
         cache->fci_threshold = cache_threshold;
 
         /* Init fld cache info. */
         cache->fci_threshold = cache_threshold;
 
         /* Init fld cache info. */
-        cache->fci_hash_mask = hash_size - 1;
-        OBD_ALLOC(cache->fci_hash_table,
-                  hash_size * sizeof(*cache->fci_hash_table));
-        if (cache->fci_hash_table == NULL) {
-                OBD_FREE_PTR(cache);
-                RETURN(ERR_PTR(-ENOMEM));
-        }
-
-        for (i = 0; i < hash_size; i++)
-                INIT_HLIST_HEAD(&cache->fci_hash_table[i]);
         memset(&cache->fci_stat, 0, sizeof(cache->fci_stat));
 
         CDEBUG(D_INFO, "%s: FLD cache - Size: %d, Threshold: %d\n",
         memset(&cache->fci_stat, 0, sizeof(cache->fci_stat));
 
         CDEBUG(D_INFO, "%s: FLD cache - Size: %d, Threshold: %d\n",
@@ -142,8 +104,10 @@ struct fld_cache *fld_cache_init(const char *name, int hash_size,
 
         RETURN(cache);
 }
 
         RETURN(cache);
 }
-EXPORT_SYMBOL(fld_cache_init);
 
 
+/**
+ * destroy fld cache.
+ */
 void fld_cache_fini(struct fld_cache *cache)
 {
         __u64 pct;
 void fld_cache_fini(struct fld_cache *cache)
 {
         __u64 pct;
@@ -162,28 +126,109 @@ void fld_cache_fini(struct fld_cache *cache)
         CDEBUG(D_INFO, "FLD cache statistics (%s):\n", cache->fci_name);
         CDEBUG(D_INFO, "  Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
         CDEBUG(D_INFO, "  Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
         CDEBUG(D_INFO, "FLD cache statistics (%s):\n", cache->fci_name);
         CDEBUG(D_INFO, "  Total reqs: "LPU64"\n", cache->fci_stat.fst_count);
         CDEBUG(D_INFO, "  Cache reqs: "LPU64"\n", cache->fci_stat.fst_cache);
-        CDEBUG(D_INFO, "  Saved RPCs: "LPU64"\n", cache->fci_stat.fst_inflight);
         CDEBUG(D_INFO, "  Cache hits: "LPU64"%%\n", pct);
 
         CDEBUG(D_INFO, "  Cache hits: "LPU64"%%\n", pct);
 
-       OBD_FREE(cache->fci_hash_table, cache->fci_hash_size *
-                sizeof(*cache->fci_hash_table));
-       OBD_FREE_PTR(cache);
-       
+        OBD_FREE_PTR(cache);
+
+        EXIT;
+}
+
+static inline void fld_cache_entry_delete(struct fld_cache *cache,
+                                         struct fld_cache_entry *node);
+
+/**
+ * fix list by checking new entry with NEXT entry in order.
+ */
+static void fld_fix_new_list(struct fld_cache *cache)
+{
+        struct fld_cache_entry *f_curr;
+        struct fld_cache_entry *f_next;
+        struct lu_seq_range *c_range;
+        struct lu_seq_range *n_range;
+        struct list_head *head = &cache->fci_entries_head;
+        ENTRY;
+
+restart_fixup:
+
+        list_for_each_entry_safe(f_curr, f_next, head, fce_list) {
+                c_range = &f_curr->fce_range;
+                n_range = &f_next->fce_range;
+
+                LASSERT(range_is_sane(c_range));
+                if (&f_next->fce_list == head)
+                        break;
+
+                LASSERT(c_range->lsr_start <= n_range->lsr_start);
+
+                /* check merge possibility with next range */
+                if (c_range->lsr_end == n_range->lsr_start) {
+                        if (c_range->lsr_mdt != n_range->lsr_mdt)
+                                continue;
+                        n_range->lsr_start = c_range->lsr_start;
+                        fld_cache_entry_delete(cache, f_curr);
+                        continue;
+                }
+
+                /* check if current range overlaps with next range. */
+                if (n_range->lsr_start < c_range->lsr_end) {
+
+                        if (c_range->lsr_mdt == n_range->lsr_mdt) {
+                                n_range->lsr_start = c_range->lsr_start;
+                                n_range->lsr_end = max(c_range->lsr_end,
+                                                       n_range->lsr_end);
+
+                                fld_cache_entry_delete(cache, f_curr);
+                        } else {
+                                if (n_range->lsr_end <= c_range->lsr_end) {
+                                        *n_range = *c_range;
+                                        fld_cache_entry_delete(cache, f_curr);
+                                } else
+                                        n_range->lsr_start = c_range->lsr_end;
+                        }
+
+                        /* we could have overlap over next
+                         * range too. better restart. */
+                        goto restart_fixup;
+                }
+
+                /* kill duplicates */
+                if (c_range->lsr_start == n_range->lsr_start &&
+                    c_range->lsr_end == n_range->lsr_end)
+                        fld_cache_entry_delete(cache, f_curr);
+        }
+
         EXIT;
 }
         EXIT;
 }
-EXPORT_SYMBOL(fld_cache_fini);
 
 
-static inline struct hlist_head *
-fld_cache_bucket(struct fld_cache *cache, seqno_t seq)
+/**
+ * add node to fld cache
+ */
+static inline void fld_cache_entry_add(struct fld_cache *cache,
+                                       struct fld_cache_entry *f_new,
+                                       struct list_head *pos)
 {
 {
-        return cache->fci_hash_table + (fld_cache_hash(seq) &
-                                        cache->fci_hash_mask);
+        list_add(&f_new->fce_list, pos);
+        list_add(&f_new->fce_lru, &cache->fci_lru);
+
+        cache->fci_cache_count++;
+        fld_fix_new_list(cache);
 }
 
 }
 
-/*
- * Check if cache needs to be shrinked. If so - do it. Tries to keep all
- * collision lists well balanced. That is, check all of them and remove one
- * entry in list and so on until cache is shrinked enough.
+/**
+ * delete given node from list.
+ */
+static inline void fld_cache_entry_delete(struct fld_cache *cache,
+                                          struct fld_cache_entry *node)
+{
+        list_del(&node->fce_list);
+        list_del(&node->fce_lru);
+        cache->fci_cache_count--;
+        OBD_FREE_PTR(node);
+}
+
+/**
+ * Check if cache needs to be shrunk. If so - do it.
+ * Remove one entry in list and so on until cache is shrunk enough.
  */
 static int fld_cache_shrink(struct fld_cache *cache)
 {
  */
 static int fld_cache_shrink(struct fld_cache *cache)
 {
@@ -200,257 +245,234 @@ static int fld_cache_shrink(struct fld_cache *cache)
         curr = cache->fci_lru.prev;
 
         while (cache->fci_cache_count + cache->fci_threshold >
         curr = cache->fci_lru.prev;
 
         while (cache->fci_cache_count + cache->fci_threshold >
-               cache->fci_cache_size && curr != &cache->fci_lru)
-        {
+               cache->fci_cache_size && curr != &cache->fci_lru) {
+
                 flde = list_entry(curr, struct fld_cache_entry, fce_lru);
                 curr = curr->prev;
                 flde = list_entry(curr, struct fld_cache_entry, fce_lru);
                 curr = curr->prev;
-
-                /* keep inflights */
-                if (flde->fce_inflight)
-                        continue;
-
-                hlist_del_init(&flde->fce_list);
-                list_del_init(&flde->fce_lru);
-                cache->fci_cache_count--;
-                OBD_FREE_PTR(flde);
+                fld_cache_entry_delete(cache, flde);
                 num++;
         }
 
                 num++;
         }
 
-        CDEBUG(D_INFO, "%s: FLD cache - Shrinked by "
+        CDEBUG(D_INFO, "%s: FLD cache - Shrunk by "
                "%d entries\n", cache->fci_name, num);
 
         RETURN(0);
 }
 
                "%d entries\n", cache->fci_name, num);
 
         RETURN(0);
 }
 
-int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
+/**
+ * kill all fld cache entries.
+ */
+void fld_cache_flush(struct fld_cache *cache)
 {
 {
-        struct fld_cache_entry *flde, *fldt;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
         ENTRY;
 
         spin_lock(&cache->fci_lock);
         ENTRY;
 
         spin_lock(&cache->fci_lock);
-
-        /* Check if cache already has the entry with such a seq. */
-        bucket = fld_cache_bucket(cache, seq);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq) {
-                        spin_unlock(&cache->fci_lock);
-                        RETURN(-EEXIST);
-                }
-        }
+        cache->fci_cache_size = 0;
+        fld_cache_shrink(cache);
         spin_unlock(&cache->fci_lock);
 
         spin_unlock(&cache->fci_lock);
 
-        /* Allocate new entry. */
-        OBD_ALLOC_PTR(flde);
-        if (!flde)
-                RETURN(-ENOMEM);
+        EXIT;
+}
 
 
-        /*
-         * Check if cache has the entry with such a seq again. It could be added
-         * while we were allocating new entry.
-         */
-        spin_lock(&cache->fci_lock);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq) {
-                        spin_unlock(&cache->fci_lock);
-                        OBD_FREE_PTR(flde);
-                        RETURN(0);
-                }
+/**
+ * punch hole in existing range. divide this range and add new
+ * entry accordingly.
+ */
+
+void fld_cache_punch_hole(struct fld_cache *cache,
+                          struct fld_cache_entry *f_curr,
+                          struct fld_cache_entry *f_new)
+{
+        const struct lu_seq_range *range = &f_new->fce_range;
+        const seqno_t new_start  = range->lsr_start;
+        const seqno_t new_end  = range->lsr_end;
+        struct fld_cache_entry *fldt;
+
+        ENTRY;
+        OBD_ALLOC_GFP(fldt, sizeof *fldt, CFS_ALLOC_ATOMIC);
+        if (!fldt) {
+                OBD_FREE_PTR(f_new);
+                EXIT;
+                /* overlap is not allowed, so dont mess up list. */
+                return;
         }
         }
+        /*  break f_curr RANGE into three RANGES:
+         *        f_curr, f_new , fldt
+         */
 
 
-        /* Add new entry to cache and lru list. */
-        INIT_HLIST_NODE(&flde->fce_list);
-        flde->fce_inflight = 1;
-        flde->fce_invalid = 1;
-        cfs_waitq_init(&flde->fce_waitq);
-        flde->fce_seq = seq;
-
-        hlist_add_head(&flde->fce_list, bucket);
-        list_add(&flde->fce_lru, &cache->fci_lru);
-        cache->fci_cache_count++;
+        /* f_new = *range */
 
 
-        spin_unlock(&cache->fci_lock);
+        /* fldt */
+        fldt->fce_range.lsr_start = new_end;
+        fldt->fce_range.lsr_end = f_curr->fce_range.lsr_end;
+        fldt->fce_range.lsr_mdt = f_curr->fce_range.lsr_mdt;
 
 
-        RETURN(0);
+        /* f_curr */
+        f_curr->fce_range.lsr_end = new_start;
+
+        /* add these two entries to list */
+        fld_cache_entry_add(cache, f_new, &f_curr->fce_list);
+        fld_cache_entry_add(cache, fldt, &f_new->fce_list);
+
+        /* no need to fixup */
+        EXIT;
 }
 }
-EXPORT_SYMBOL(fld_cache_insert_inflight);
 
 
-int fld_cache_insert(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t mds)
+/**
+ * handle range overlap in fld cache.
+ */
+void fld_cache_overlap_handle(struct fld_cache *cache,
+                              struct fld_cache_entry *f_curr,
+                              struct fld_cache_entry *f_new)
 {
 {
-        struct fld_cache_entry *flde, *fldt;
-        struct hlist_head *bucket;
-        struct hlist_node *scan;
-        int rc;
-        ENTRY;
+        const struct lu_seq_range *range = &f_new->fce_range;
+        const seqno_t new_start  = range->lsr_start;
+        const seqno_t new_end  = range->lsr_end;
+        const mdsno_t mdt = range->lsr_mdt;
 
 
-        spin_lock(&cache->fci_lock);
+        /* this is overlap case, these case are checking overlapping with
+         * prev range only. fixup will handle overlaping with next range. */
 
 
-        /* Check if need to shrink cache. */
-        rc = fld_cache_shrink(cache);
-        if (rc) {
-                spin_unlock(&cache->fci_lock);
-                RETURN(rc);
-        }
+        if (f_curr->fce_range.lsr_mdt == mdt) {
+                f_curr->fce_range.lsr_start = min(f_curr->fce_range.lsr_start,
+                                                  new_start);
 
 
-        /* Check if cache already has the entry with such a seq. */
-        bucket = fld_cache_bucket(cache, seq);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq) {
-                        if (fldt->fce_inflight) {
-                                /* set mds for inflight entry */
-                                fldt->fce_mds = mds;
-                                fldt->fce_inflight = 0;
-                                fldt->fce_invalid = 0;
-                                cfs_waitq_signal(&fldt->fce_waitq);
-                                rc = 0;
-                        } else
-                                rc = -EEXIST;
-                        spin_unlock(&cache->fci_lock);
-                        RETURN(rc);
-                }
-        }
-        spin_unlock(&cache->fci_lock);
+                f_curr->fce_range.lsr_end = max(f_curr->fce_range.lsr_end,
+                                                new_end);
 
 
-        /* Allocate new entry. */
-        OBD_ALLOC_PTR(flde);
-        if (!flde)
-                RETURN(-ENOMEM);
+                OBD_FREE_PTR(f_new);
+                fld_fix_new_list(cache);
 
 
-        /*
-         * Check if cache has the entry with such a seq again. It could be added
-         * while we were allocating new entry.
-         */
-        spin_lock(&cache->fci_lock);
-        hlist_for_each_entry(fldt, scan, bucket, fce_list) {
-                if (fldt->fce_seq == seq) {
-                        spin_unlock(&cache->fci_lock);
-                        OBD_FREE_PTR(flde);
-                        RETURN(0);
-                }
-        }
+        } else if (new_start <= f_curr->fce_range.lsr_start &&
+                        f_curr->fce_range.lsr_end <= new_end) {
+                /* case 1: new range completely overshadowed existing range.
+                 *         e.g. whole range migrated. update fld cache entry */
 
 
-        /* Add new entry to cache and lru list. */
-        INIT_HLIST_NODE(&flde->fce_list);
-        flde->fce_mds = mds;
-        flde->fce_seq = seq;
-        flde->fce_inflight = 0;
-        flde->fce_invalid = 0;
-
-        hlist_add_head(&flde->fce_list, bucket);
-        list_add(&flde->fce_lru, &cache->fci_lru);
-        cache->fci_cache_count++;
+                f_curr->fce_range = *range;
+                OBD_FREE_PTR(f_new);
+                fld_fix_new_list(cache);
 
 
-        spin_unlock(&cache->fci_lock);
+        } else if (f_curr->fce_range.lsr_start < new_start &&
+                        new_end < f_curr->fce_range.lsr_end) {
+                /* case 2: new range fit within existing range. */
 
 
-        RETURN(0);
+                fld_cache_punch_hole(cache, f_curr, f_new);
+
+        } else  if (new_end <= f_curr->fce_range.lsr_end) {
+                /* case 3: overlap:
+                 *         [new_start [c_start  new_end)  c_end)
+                 */
+
+                LASSERT(new_start <= f_curr->fce_range.lsr_start);
+
+                f_curr->fce_range.lsr_start = new_end;
+                fld_cache_entry_add(cache, f_new, f_curr->fce_list.prev);
+
+        } else if (f_curr->fce_range.lsr_start <= new_start) {
+                /* case 4: overlap:
+                 *         [c_start [new_start c_end) new_end)
+                 */
+
+                LASSERT(f_curr->fce_range.lsr_end <= new_end);
+
+                f_curr->fce_range.lsr_end = new_start;
+                fld_cache_entry_add(cache, f_new, &f_curr->fce_list);
+        } else
+                CERROR("NEW range ="DRANGE" curr = "DRANGE"\n",
+                       PRANGE(range),PRANGE(&f_curr->fce_range));
 }
 }
-EXPORT_SYMBOL(fld_cache_insert);
 
 
-void fld_cache_delete(struct fld_cache *cache, seqno_t seq)
+/**
+ * Insert FLD entry in FLD cache.
+ *
+ * This function handles all cases of merging and breaking up of
+ * ranges.
+ */
+void fld_cache_insert(struct fld_cache *cache,
+                      const struct lu_seq_range *range)
 {
 {
-        struct fld_cache_entry *flde;
-        struct hlist_node *scan, *n;
-        struct hlist_head *bucket;
+        struct fld_cache_entry *f_new;
+        struct fld_cache_entry *f_curr;
+        struct fld_cache_entry *n;
+        struct list_head *head;
+        struct list_head *prev = NULL;
+        const seqno_t new_start  = range->lsr_start;
+        const seqno_t new_end  = range->lsr_end;
         ENTRY;
 
         ENTRY;
 
-        bucket = fld_cache_bucket(cache, seq);
-       
+        LASSERT(range_is_sane(range));
+
+        /* Allocate new entry. */
+        OBD_ALLOC_PTR(f_new);
+        if (!f_new) {
+                EXIT;
+                return;
+        }
+
+        f_new->fce_range = *range;
+
+        /*
+         * Duplicate entries are eliminated in inset op.
+         * So we don't need to search new entry before starting insertion loop.
+         */
+
         spin_lock(&cache->fci_lock);
         spin_lock(&cache->fci_lock);
-        hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
-                if (flde->fce_seq == seq) {
-                        hlist_del_init(&flde->fce_list);
-                        list_del_init(&flde->fce_lru);
-                        if (flde->fce_inflight) {
-                                flde->fce_inflight = 0;
-                                flde->fce_invalid = 1;
-                                cfs_waitq_signal(&flde->fce_waitq);
-                        }
-                        cache->fci_cache_count--;
-                       OBD_FREE_PTR(flde);
-                        GOTO(out_unlock, 0);
+        fld_cache_shrink(cache);
+
+        head = &cache->fci_entries_head;
+
+        list_for_each_entry_safe(f_curr, n, head, fce_list) {
+                /* add list if next is end of list */
+                if (new_end < f_curr->fce_range.lsr_start)
+                        break;
+
+                prev = &f_curr->fce_list;
+                /* check if this range is to left of new range. */
+                if (new_start < f_curr->fce_range.lsr_end) {
+                        fld_cache_overlap_handle(cache, f_curr, f_new);
+                        goto out;
                 }
         }
 
                 }
         }
 
-        EXIT;
-out_unlock:
-        spin_unlock(&cache->fci_lock);
-}
-EXPORT_SYMBOL(fld_cache_delete);
+        if (prev == NULL)
+                prev = head;
 
 
-static int fld_check_inflight(struct fld_cache_entry *flde)
-{
-        return (flde->fce_inflight);
+        /* Add new entry to cache and lru list. */
+        fld_cache_entry_add(cache, f_new, prev);
+out:
+        spin_unlock(&cache->fci_lock);
+        EXIT;
 }
 
 }
 
+/**
+ * lookup \a seq sequence for range in fld cache.
+ */
 int fld_cache_lookup(struct fld_cache *cache,
 int fld_cache_lookup(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t *mds)
+                     const seqno_t seq, struct lu_seq_range *range)
 {
         struct fld_cache_entry *flde;
 {
         struct fld_cache_entry *flde;
-        struct hlist_node *scan, *n;
-        struct hlist_head *bucket;
+        struct list_head *head;
         ENTRY;
 
         ENTRY;
 
-        bucket = fld_cache_bucket(cache, seq);
 
         spin_lock(&cache->fci_lock);
 
         spin_lock(&cache->fci_lock);
+        head = &cache->fci_entries_head;
+
         cache->fci_stat.fst_count++;
         cache->fci_stat.fst_count++;
-        hlist_for_each_entry_safe(flde, scan, n, bucket, fce_list) {
-                if (flde->fce_seq == seq) {
-                        if (flde->fce_inflight) {
-                                /* lookup RPC is inflight need to wait */
-                                struct l_wait_info lwi;
-                                spin_unlock(&cache->fci_lock);
-                                lwi = LWI_TIMEOUT(0, NULL, NULL);
-                                l_wait_event(flde->fce_waitq,
-                                             !fld_check_inflight(flde), &lwi);
-                                LASSERT(!flde->fce_inflight);
-                                if (flde->fce_invalid) 
-                                        RETURN(-ENOENT);
-                                
-                                *mds = flde->fce_mds;
-                                cache->fci_stat.fst_inflight++;
-                        } else {
-                                LASSERT(!flde->fce_invalid);
-                                *mds = flde->fce_mds;
-                                list_del(&flde->fce_lru);
-                                list_add(&flde->fce_lru, &cache->fci_lru);
-                                cache->fci_stat.fst_cache++;
-                                spin_unlock(&cache->fci_lock);
-                        }
+        list_for_each_entry(flde, head, fce_list) {
+                if (flde->fce_range.lsr_start > seq)
+                        break;
+
+                if (range_within(&flde->fce_range, seq)) {
+                        *range = flde->fce_range;
+
+                        /* update position of this entry in lru list. */
+                        list_move(&flde->fce_lru, &cache->fci_lru);
+                        cache->fci_stat.fst_cache++;
+                        spin_unlock(&cache->fci_lock);
                         RETURN(0);
                 }
         }
         spin_unlock(&cache->fci_lock);
         RETURN(-ENOENT);
 }
                         RETURN(0);
                 }
         }
         spin_unlock(&cache->fci_lock);
         RETURN(-ENOENT);
 }
-EXPORT_SYMBOL(fld_cache_lookup);
-#else
-int fld_cache_insert_inflight(struct fld_cache *cache, seqno_t seq)
-{
-        return -ENOTSUPP;
-}
-EXPORT_SYMBOL(fld_cache_insert_inflight);
-
-int fld_cache_insert(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t mds)
-{
-        return -ENOTSUPP;
-}
-EXPORT_SYMBOL(fld_cache_insert);
-
-void fld_cache_delete(struct fld_cache *cache,
-                      seqno_t seq)
-{
-        return;
-}
-EXPORT_SYMBOL(fld_cache_delete);
-
-int fld_cache_lookup(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t *mds)
-{
-        return -ENOTSUPP;
-}
-EXPORT_SYMBOL(fld_cache_lookup);
-#endif
index 0f6e7cc..2b6ab12 100644 (file)
@@ -39,6 +39,7 @@
  *
  * Author: Yury Umanets <umka@clusterfs.com>
  * Author: WangDi <wangdi@clusterfs.com>
  *
  * Author: Yury Umanets <umka@clusterfs.com>
  * Author: WangDi <wangdi@clusterfs.com>
+ * Author: Pravin Shelar <pravin.shelar@sun.com>
  */
 
 #ifndef EXPORT_SYMTAB
  */
 
 #ifndef EXPORT_SYMTAB
@@ -109,106 +110,200 @@ static void __exit fld_mod_exit(void)
         }
 }
 
         }
 }
 
-/* Insert index entry and update cache. */
+/**
+ * Insert FLD index entry and update FLD cache.
+ *
+ * First it try to merge given range with existing range then update
+ * FLD index and FLD cache accordingly. FLD index consistency is maintained
+ * by this function.
+ * This function is called from the sequence allocator when a super-sequence
+ * is granted to a server.
+ */
+
 int fld_server_create(struct lu_server_fld *fld,
                       const struct lu_env *env,
 int fld_server_create(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq, mdsno_t mds)
+                      struct lu_seq_range *add_range,
+                      struct thandle *th)
 {
 {
-        int rc;
+        struct lu_seq_range *erange;
+        struct lu_seq_range *new;
+        struct fld_thread_info *info;
+        int rc = 0;
+        int do_merge=0;
+
         ENTRY;
         ENTRY;
-        
-        rc = fld_index_create(fld, env, seq, mds);
-        
-        if (rc == 0) {
-                /*
-                 * Do not return result of calling fld_cache_insert()
-                 * here. First of all because it may return -EEXISTS. Another
-                 * reason is that, we do not want to stop proceeding even after
-                 * cache errors.
+
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        mutex_lock(&fld->lsf_lock);
+
+        erange = &info->fti_lrange;
+        new = &info->fti_irange;
+        *new = *add_range;
+
+        /* STEP 1: try to merge with previous range */
+        rc = fld_index_lookup(fld, env, new->lsr_start, erange);
+        if (!rc) {
+                /* in case of range overlap, mdt ID must be same for both ranges */
+                if (new->lsr_mdt != erange->lsr_mdt) {
+                        CERROR("mdt[%x] for given range is different from"
+                               "existing overlapping range mdt[%x]\n",
+                                new->lsr_mdt, erange->lsr_mdt);
+                        rc = -EIO;
+                        GOTO(out, rc);
+                }
+
+                if (new->lsr_end < erange->lsr_end)
+                        GOTO(out, rc);
+                do_merge = 1;
+
+        } else if (rc == -ENOENT) {
+                /* check for merge case: optimizes for single mds lustre.
+                 * As entry does not exist, returned entry must be left side
+                 * entry compared to start of new range (ref dio_lookup()).
+                 * So try to merge from left.
                  */
                  */
-                fld_cache_insert(fld->lsf_cache, seq, mds);
+                if (new->lsr_start == erange->lsr_end &&
+                    new->lsr_mdt == erange->lsr_mdt)
+                        do_merge = 1;
+        } else {
+                /* no overlap allowed in fld, so failure in lookup is error */
+                GOTO(out, rc);
         }
 
         }
 
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_server_create);
+        if (do_merge) {
+                /* new range can be combined with existing one.
+                 * So delete existing range.
+                 */
 
 
-/* Delete index entry. */
-int fld_server_delete(struct lu_server_fld *fld,
-                      const struct lu_env *env,
-                      seqno_t seq)
-{
-        int rc;
-        ENTRY;
+                rc = fld_index_delete(fld, env, erange, th);
+                if (rc == 0) {
+                        new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                        new->lsr_end = max(erange->lsr_end, new->lsr_end);
+                } else
+                        GOTO(out, rc);
+
+                do_merge = 0;
+        }
 
 
-        fld_cache_delete(fld->lsf_cache, seq);
-        rc = fld_index_delete(fld, env, seq);
+        /* STEP 2: try to merge with next range */
+        rc = fld_index_lookup(fld, env, new->lsr_end, erange);
+        if (!rc) {
+                /* case range overlap: with right side entry. */
+                if (new->lsr_mdt == erange->lsr_mdt)
+                        do_merge = 1;
+        } else if (rc == -ENOENT) {
+                /* this range is left of new range end point */
+                LASSERT(erange->lsr_end <= new->lsr_end);
+
+                if (new->lsr_end == erange->lsr_end)
+                        do_merge = 1;
+                if (new->lsr_start <= erange->lsr_start)
+                        do_merge = 1;
+        } else
+               GOTO(out, rc);
+
+        if (do_merge) {
+                if (new->lsr_mdt != erange->lsr_mdt) {
+                        CERROR("mdt[%x] for given range is different from"
+                               "existing overlapping range mdt[%x]\n",
+                                new->lsr_mdt, erange->lsr_mdt);
+                        rc = -EIO;
+                        GOTO(out, rc);
+                }
         
         
+                /* merge with next range */
+                rc = fld_index_delete(fld, env, erange, th);
+                if (rc == 0) {
+                        new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                        new->lsr_end = max(erange->lsr_end, new->lsr_end);
+                } else
+                        GOTO(out, rc);
+        }
+
+        /* now update fld entry. */
+        rc = fld_index_create(fld, env, new, th);
+
+        LASSERT(rc != -EEXIST);
+out:
+        if (rc == 0)
+                fld_cache_insert(fld->lsf_cache, new);
+
+        mutex_unlock(&fld->lsf_lock);
+
+        CDEBUG((rc != 0 ? D_ERROR : D_INFO),
+               "%s: FLD create: given range : "DRANGE
+               "after merge "DRANGE" rc = %d \n", fld->lsf_name,
+                PRANGE(add_range), PRANGE(new), rc);
+
         RETURN(rc);
 }
         RETURN(rc);
 }
-EXPORT_SYMBOL(fld_server_delete);
 
 
-/* Lookup mds by seq. */
+EXPORT_SYMBOL(fld_server_create);
+
+/**
+ *  Lookup mds by seq, returns a range for given seq.
+ *
+ *  If that entry is not cached in fld cache, request is sent to super
+ *  sequence controller node (MDT0). All other MDT[1...N] and client
+ *  cache fld entries, but this cache is not persistent.
+ */
+
 int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_env *env,
 int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq, mdsno_t *mds)
+                      seqno_t seq, struct lu_seq_range *range)
 {
         int rc;
         ENTRY;
 {
         int rc;
         ENTRY;
-        
+
         /* Lookup it in the cache. */
         /* Lookup it in the cache. */
-        rc = fld_cache_lookup(fld->lsf_cache, seq, mds);
+        rc = fld_cache_lookup(fld->lsf_cache, seq, range);
         if (rc == 0)
                 RETURN(0);
 
         if (rc == 0)
                 RETURN(0);
 
-        rc = fld_index_lookup(fld, env, seq, mds);
-        if (rc == 0) {
-                /*
-                 * Do not return error here as well. See previous comment in
-                 * same situation in function fld_server_create().
+        if (fld->lsf_obj)
+                rc = fld_index_lookup(fld, env, seq, range);
+        else {
+                LASSERT(fld->lsf_control_exp);
+                /* send request to mdt0 i.e. super seq. controller.
+                 * This is temporary solution, long term solution is fld
+                 * replication on all mdt servers.
                  */
                  */
-                fld_cache_insert(fld->lsf_cache, seq, *mds);
+                rc = fld_client_rpc(fld->lsf_control_exp,
+                                    range, FLD_LOOKUP);
         }
         }
+
+        if (rc == 0)
+                fld_cache_insert(fld->lsf_cache, range);
+
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_lookup);
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(fld_server_lookup);
 
+/**
+ * All MDT server handle fld lookup operation. But only MDT0 has fld index.
+ * if entry is not found in cache we need to forward lookup request to MDT0
+ */
+
 static int fld_server_handle(struct lu_server_fld *fld,
                              const struct lu_env *env,
 static int fld_server_handle(struct lu_server_fld *fld,
                              const struct lu_env *env,
-                             __u32 opc, struct md_fld *mf,
+                             __u32 opc, struct lu_seq_range *range,
                              struct fld_thread_info *info)
 {
         int rc;
         ENTRY;
 
         switch (opc) {
                              struct fld_thread_info *info)
 {
         int rc;
         ENTRY;
 
         switch (opc) {
-        case FLD_CREATE:
-                rc = fld_server_create(fld, env,
-                                       mf->mf_seq, mf->mf_mds);
-
-                /* Do not return -EEXIST error for resent case */
-                if ((info->fti_flags & MSG_RESENT) && rc == -EEXIST)
-                        rc = 0;
-                break;
-        case FLD_DELETE:
-                rc = fld_server_delete(fld, env, mf->mf_seq);
-
-                /* Do not return -ENOENT error for resent case */
-                if ((info->fti_flags & MSG_RESENT) && rc == -ENOENT)
-                        rc = 0;
-                break;
         case FLD_LOOKUP:
                 rc = fld_server_lookup(fld, env,
         case FLD_LOOKUP:
                 rc = fld_server_lookup(fld, env,
-                                       mf->mf_seq, &mf->mf_mds);
+                                       range->lsr_start, range);
                 break;
         default:
                 rc = -EINVAL;
                 break;
         }
 
                 break;
         default:
                 rc = -EINVAL;
                 break;
         }
 
-        CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, seq: "
-               LPX64", mds: "LPU64")\n", fld->lsf_name, rc, opc,
-               mf->mf_seq, mf->mf_mds);
+        CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
+               DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
         
         RETURN(rc);
 
         
         RETURN(rc);
 
@@ -218,8 +313,8 @@ static int fld_req_handle(struct ptlrpc_request *req,
                           struct fld_thread_info *info)
 {
         struct lu_site *site;
                           struct fld_thread_info *info)
 {
         struct lu_site *site;
-        struct md_fld *in;
-        struct md_fld *out;
+        struct lu_seq_range *in;
+        struct lu_seq_range *out;
         int rc;
         __u32 *opc;
         ENTRY;
         int rc;
         __u32 *opc;
         ENTRY;
@@ -252,8 +347,6 @@ static int fld_req_handle(struct ptlrpc_request *req,
 static void fld_thread_info_init(struct ptlrpc_request *req,
                                  struct fld_thread_info *info)
 {
 static void fld_thread_info_init(struct ptlrpc_request *req,
                                  struct fld_thread_info *info)
 {
-        info->fti_flags = lustre_msg_get_flags(req->rq_reqmsg);
-
         info->fti_pill = &req->rq_pill;
         /* Init request capsule. */
         req_capsule_init(info->fti_pill, req, RCL_SERVER);
         info->fti_pill = &req->rq_pill;
         /* Init request capsule. */
         req_capsule_init(info->fti_pill, req, RCL_SERVER);
@@ -301,21 +394,27 @@ EXPORT_SYMBOL(fld_query);
  *
  * fid_is_local() is supposed to be used in assertion checks only.
  */
  *
  * fid_is_local() is supposed to be used in assertion checks only.
  */
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
+int fid_is_local(const struct lu_env *env,
+                 struct lu_site *site, const struct lu_fid *fid)
 {
         int result;
         struct md_site *msite;
 {
         int result;
         struct md_site *msite;
+        struct lu_seq_range *range;
+        struct fld_thread_info *info;
+        ENTRY;
+
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        range = &info->fti_lrange;
 
         result = 1; /* conservatively assume fid is local */
         msite = lu_site2md(site);
         if (msite->ms_client_fld != NULL) {
 
         result = 1; /* conservatively assume fid is local */
         msite = lu_site2md(site);
         if (msite->ms_client_fld != NULL) {
-                mdsno_t mds;
                 int rc;
 
                 rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
                 int rc;
 
                 rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
-                                      fid_seq(fid), &mds);
+                                      fid_seq(fid), range);
                 if (rc == 0)
                 if (rc == 0)
-                        result = (mds == msite->ms_node_id);
+                        result = (range->lsr_mdt == msite->ms_node_id);
         }
         return result;
 }
         }
         return result;
 }
@@ -363,7 +462,8 @@ static void fld_server_proc_fini(struct lu_server_fld *fld)
 #endif
 
 int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
 #endif
 
 int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
-                    const char *prefix, const struct lu_env *env)
+                    const char *prefix, const struct lu_env *env,
+                    int mds_node_id)
 {
         int cache_size, cache_threshold;
         int rc;
 {
         int cache_size, cache_threshold;
         int rc;
@@ -378,8 +478,8 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         cache_threshold = cache_size *
                 FLD_SERVER_CACHE_THRESHOLD / 100;
 
         cache_threshold = cache_size *
                 FLD_SERVER_CACHE_THRESHOLD / 100;
 
+        mutex_init(&fld->lsf_lock);
         fld->lsf_cache = fld_cache_init(fld->lsf_name,
         fld->lsf_cache = fld_cache_init(fld->lsf_name,
-                                        FLD_SERVER_HTABLE_SIZE,
                                         cache_size, cache_threshold);
         if (IS_ERR(fld->lsf_cache)) {
                 rc = PTR_ERR(fld->lsf_cache);
                                         cache_size, cache_threshold);
         if (IS_ERR(fld->lsf_cache)) {
                 rc = PTR_ERR(fld->lsf_cache);
@@ -387,14 +487,18 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
                 GOTO(out, rc);
         }
 
                 GOTO(out, rc);
         }
 
-        rc = fld_index_init(fld, env, dt);
-        if (rc)
-                GOTO(out, rc);
+        if (!mds_node_id) {
+                rc = fld_index_init(fld, env, dt);
+                if (rc)
+                        GOTO(out, rc);
+        } else
+                fld->lsf_obj = NULL;
 
         rc = fld_server_proc_init(fld);
         if (rc)
                 GOTO(out, rc);
 
 
         rc = fld_server_proc_init(fld);
         if (rc)
                 GOTO(out, rc);
 
+        fld->lsf_control_exp = NULL;
         EXIT;
 out:
         if (rc)
         EXIT;
 out:
         if (rc)
index 1b927ea..03da47e 100644 (file)
 #include <dt_object.h>
 #include <md_object.h>
 #include <lustre_mdc.h>
 #include <dt_object.h>
 #include <md_object.h>
 #include <lustre_mdc.h>
+#include <lustre_fid.h>
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
 const char fld_index_name[] = "fld";
 #include <lustre_fld.h>
 #include "fld_internal.h"
 
 const char fld_index_name[] = "fld";
-EXPORT_SYMBOL(fld_index_name);
+
+static const struct lu_seq_range IGIF_FLD_RANGE = {
+        .lsr_start = 1,
+        .lsr_end   = IDIF_SEQ_START,
+        .lsr_mdt   = 0
+};
 
 const struct dt_index_features fld_index_features = {
         .dif_flags       = DT_IND_UPDATE,
         .dif_keysize_min = sizeof(seqno_t),
         .dif_keysize_max = sizeof(seqno_t),
 
 const struct dt_index_features fld_index_features = {
         .dif_flags       = DT_IND_UPDATE,
         .dif_keysize_min = sizeof(seqno_t),
         .dif_keysize_max = sizeof(seqno_t),
-        .dif_recsize_min = sizeof(mdsno_t),
-        .dif_recsize_max = sizeof(mdsno_t),
+        .dif_recsize_min = sizeof(struct lu_seq_range),
+        .dif_recsize_max = sizeof(struct lu_seq_range),
         .dif_ptrsize     = 4
 };
 
         .dif_ptrsize     = 4
 };
 
-EXPORT_SYMBOL(fld_index_features);
-
-/*
- * number of blocks to reserve for particular operations. Should be function of
- * ... something. Stub for now.
- */
-enum {
-        FLD_TXN_INDEX_INSERT_CREDITS  = 20,
-        FLD_TXN_INDEX_DELETE_CREDITS  = 20,
-};
-
 extern struct lu_context_key fld_thread_key;
 
 static struct dt_key *fld_key(const struct lu_env *env,
 extern struct lu_context_key fld_thread_key;
 
 static struct dt_key *fld_key(const struct lu_env *env,
@@ -102,86 +97,174 @@ static struct dt_key *fld_key(const struct lu_env *env,
 }
 
 static struct dt_rec *fld_rec(const struct lu_env *env,
 }
 
 static struct dt_rec *fld_rec(const struct lu_env *env,
-                              const mdsno_t mds)
+                              const struct lu_seq_range *range)
 {
         struct fld_thread_info *info;
 {
         struct fld_thread_info *info;
+        struct lu_seq_range *rec;
         ENTRY;
 
         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
         LASSERT(info != NULL);
         ENTRY;
 
         info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
         LASSERT(info != NULL);
+        rec = &info->fti_rec;
+
+        range_cpu_to_be(rec, range);
+        RETURN((void *)rec);
+}
+
+struct thandle* fld_trans_start(struct lu_server_fld *fld,
+                                const struct lu_env *env, int credit)
+{
+        struct fld_thread_info *info;
+        struct dt_device *dt_dev;
+        struct txn_param *p;
+
+        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        p = &info->fti_txn_param;
+        txn_param_init(p, credit);
 
 
-        info->fti_rec = cpu_to_be64(mds);
-        RETURN((void *)&info->fti_rec);
+        return dt_dev->dd_ops->dt_trans_start(env, dt_dev, p);
 }
 
 }
 
+void fld_trans_stop(struct lu_server_fld *fld,
+                    const struct lu_env *env, struct thandle* th)
+{
+        struct dt_device *dt_dev;
+
+        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
+        dt_dev->dd_ops->dt_trans_stop(env, th);
+}
+
+/**
+ * insert range in fld store.
+ *
+ *      \param  range  range to be inserted
+ *      \param  th     transaction for this operation as it could compound
+ *                     transaction.
+ *
+ *      \retval  0  success
+ *      \retval  -ve error
+ */
+
 int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_env *env,
 int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq, mdsno_t mds)
+                     const struct lu_seq_range *range,
+                     struct thandle *th)
 {
         struct dt_object *dt_obj = fld->lsf_obj;
         struct dt_device *dt_dev;
 {
         struct dt_object *dt_obj = fld->lsf_obj;
         struct dt_device *dt_dev;
-        struct txn_param txn;
-        struct thandle *th;
+        seqno_t start;
         int rc;
         int rc;
+
         ENTRY;
 
         ENTRY;
 
+        start = range->lsr_start;
+        LASSERT(range_is_sane(range));
         dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
 
         dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
 
-        /* stub here, will fix it later */
-        txn_param_init(&txn, FLD_TXN_INDEX_INSERT_CREDITS);
-
-        th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn);
-        if (!IS_ERR(th)) {
-                rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
-                                                      fld_rec(env, mds),
-                                                      fld_key(env, seq),
-                                                      th, BYPASS_CAPA, 1);
-                dt_dev->dd_ops->dt_trans_stop(env, th);
-        } else
-                rc = PTR_ERR(th);
+        rc = dt_obj->do_index_ops->dio_insert(env, dt_obj,
+                                              fld_rec(env, range),
+                                              fld_key(env, start),
+                                              th, BYPASS_CAPA, 1);
+
+        CDEBUG(D_INFO, "%s: insert given range : "DRANGE" rc = %d\n",
+               fld->lsf_name, PRANGE(range), rc);
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+/**
+ * delete range in fld store.
+ *
+ *      \param  range range to be deleted
+ *      \param  th     transaction
+ *
+ *      \retval  0  success
+ *      \retval  -ve error
+ */
+
 int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_env *env,
 int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq)
+                     struct lu_seq_range *range,
+                     struct thandle   *th)
 {
         struct dt_object *dt_obj = fld->lsf_obj;
         struct dt_device *dt_dev;
 {
         struct dt_object *dt_obj = fld->lsf_obj;
         struct dt_device *dt_dev;
-        struct txn_param txn;
-        struct thandle *th;
+        seqno_t seq = range->lsr_start;
         int rc;
         int rc;
+
         ENTRY;
 
         dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
         ENTRY;
 
         dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
-        txn_param_init(&txn, FLD_TXN_INDEX_DELETE_CREDITS);
-        th = dt_dev->dd_ops->dt_trans_start(env, dt_dev, &txn);
-        if (!IS_ERR(th)) {
-                rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
-                                                      fld_key(env, seq), th,
-                                                      BYPASS_CAPA);
-                dt_dev->dd_ops->dt_trans_stop(env, th);
-        } else
-                rc = PTR_ERR(th);
+        rc = dt_obj->do_index_ops->dio_delete(env, dt_obj,
+                                              fld_key(env, seq), th,
+                                              BYPASS_CAPA);
+
+        CDEBUG(D_INFO, "%s: delete given range : "DRANGE" rc = %d\n",
+               fld->lsf_name, PRANGE(range), rc);
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
+/**
+ * lookup range for a seq passed
+ *
+ *      \param  seq     seq for lookup.
+ *      \param  range   result of lookup.
+ *
+ *      \retval  0  success
+ *      \retval  -ve error
+ */
+
 int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_env *env,
 int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq, mdsno_t *mds)
+                     seqno_t seq,
+                     struct lu_seq_range *range)
 {
 {
-        struct dt_object *dt_obj = fld->lsf_obj;
-        struct dt_rec    *rec = fld_rec(env, 0);
+        struct dt_object        *dt_obj = fld->lsf_obj;
+        struct lu_seq_range     *fld_rec;
+        struct dt_key           *key = fld_key(env, seq);
+        struct fld_thread_info  *info;
         int rc;
         int rc;
+
         ENTRY;
 
         ENTRY;
 
-        rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj, rec,
-                                              fld_key(env, seq), BYPASS_CAPA);
-        if (rc > 0) {
-                *mds = be64_to_cpu(*(__u64 *)rec);
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        fld_rec = &info->fti_rec;
+
+        rc = dt_obj->do_index_ops->dio_lookup(env, dt_obj,
+                                              (struct dt_rec*) fld_rec,
+                                              key, BYPASS_CAPA);
+
+        if (rc >= 0) {
+                range_be_to_cpu(fld_rec, fld_rec);
+                *range = *fld_rec;
+                if (range_within(range, seq))
+                        rc = 0;
+                else
+                        rc = -ENOENT;
+        }
+
+        CDEBUG(D_INFO, "%s: lookup seq = %llx range : "DRANGE" rc = %d\n",
+               fld->lsf_name, seq, PRANGE(range), rc);
+
+        RETURN(rc);
+}
+
+static int fld_insert_igif_fld(struct lu_server_fld *fld,
+                               const struct lu_env *env)
+{
+        struct thandle *th;
+        int rc;
+
+        ENTRY;
+        th = fld_trans_start(fld, env, FLD_TXN_INDEX_INSERT_CREDITS);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
+
+        rc = fld_index_create(fld, env, &IGIF_FLD_RANGE, th);
+        fld_trans_stop(fld, env, th);
+        if (rc == -EEXIST)
                 rc = 0;
                 rc = 0;
-        } else
-                rc = -ENOENT;
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
@@ -199,11 +282,20 @@ int fld_index_init(struct lu_server_fld *fld,
                 fld->lsf_obj = dt_obj;
                 rc = dt_obj->do_ops->do_index_try(env, dt_obj,
                                                   &fld_index_features);
                 fld->lsf_obj = dt_obj;
                 rc = dt_obj->do_ops->do_index_try(env, dt_obj,
                                                   &fld_index_features);
-                if (rc == 0)
+                if (rc == 0) {
                         LASSERT(dt_obj->do_index_ops != NULL);
                         LASSERT(dt_obj->do_index_ops != NULL);
-                else
+                        rc = fld_insert_igif_fld(fld, env);
+
+                        if (rc != 0) {
+                                CERROR("insert igif in fld! = %d\n", rc);
+                                lu_object_put(env, &dt_obj->do_lu);
+                                fld->lsf_obj = NULL;
+                        }
+                } else
                         CERROR("%s: File \"%s\" is not an index!\n",
                                fld->lsf_name, fld_index_name);
                         CERROR("%s: File \"%s\" is not an index!\n",
                                fld->lsf_name, fld_index_name);
+
+
         } else {
                 CERROR("%s: Can't find \"%s\" obj %d\n",
                        fld->lsf_name, fld_index_name, (int)PTR_ERR(dt_obj));
         } else {
                 CERROR("%s: Can't find \"%s\" obj %d\n",
                        fld->lsf_name, fld_index_name, (int)PTR_ERR(dt_obj));
index 7a86e2e..6b50b16 100644 (file)
 #include <dt_object.h>
 
 #include <libcfs/libcfs.h>
 #include <dt_object.h>
 
 #include <libcfs/libcfs.h>
-
 #include <lustre_req_layout.h>
 #include <lustre_fld.h>
 
 #include <lustre_req_layout.h>
 #include <lustre_fld.h>
 
+enum {
+        LUSTRE_FLD_INIT = 1 << 0,
+        LUSTRE_FLD_RUN  = 1 << 1
+};
+
+struct fld_stats {
+        __u64   fst_count;
+        __u64   fst_cache;
+        __u64   fst_inflight;
+};
+
+typedef int (*fld_hash_func_t) (struct lu_client_fld *, __u64);
+
+typedef struct lu_fld_target *
+(*fld_scan_func_t) (struct lu_client_fld *, __u64);
+
+struct lu_fld_hash {
+        const char              *fh_name;
+        fld_hash_func_t          fh_hash_func;
+        fld_scan_func_t          fh_scan_func;
+};
+
+struct fld_cache_entry {
+        struct list_head         fce_lru;
+        struct list_head         fce_list;
+        /**
+         * fld cache entries are sorted on range->lsr_start field. */
+        struct lu_seq_range      fce_range;
+};
+
+struct fld_cache {
+        /**
+         * Cache guard, protects fci_hash mostly because others immutable after
+         * init is finished.
+         */
+        spinlock_t               fci_lock;
+
+        /**
+         * Cache shrink threshold */
+        int                      fci_threshold;
+
+        /**
+         * Prefered number of cached entries */
+        int                      fci_cache_size;
+
+        /**
+         * Current number of cached entries. Protected by @fci_lock */
+        int                      fci_cache_count;
+
+        /**
+         * LRU list fld entries. */
+        struct list_head         fci_lru;
+
+        /**
+         * sorted fld entries. */
+        struct list_head         fci_entries_head;
+
+        /**
+         * Cache statistics. */
+        struct fld_stats         fci_stat;
+
+        /**
+         * Cache name used for debug and messages. */
+        char                     fci_name[80];
+};
+
 enum fld_op {
         FLD_CREATE = 0,
         FLD_DELETE = 1,
 enum fld_op {
         FLD_CREATE = 0,
         FLD_DELETE = 1,
@@ -71,30 +136,26 @@ enum {
         FLD_CLIENT_CACHE_THRESHOLD = 10
 };
 
         FLD_CLIENT_CACHE_THRESHOLD = 10
 };
 
-enum {
-        /*
-         * One page is used for hashtable. That is sizeof(struct hlist_head) *
-         * 1024.
-         */
-        FLD_CLIENT_HTABLE_SIZE     = (1024 * 1),
-
-        /* 
-         * Here 4 pages are used for hashtable of server cache. This is is
-         * because cache it self is 4 times bugger.
-         */
-        FLD_SERVER_HTABLE_SIZE     = (1024 * 4)
-};
-
 extern struct lu_fld_hash fld_hash[];
 
 #ifdef __KERNEL__
 extern struct lu_fld_hash fld_hash[];
 
 #ifdef __KERNEL__
+
 struct fld_thread_info {
         struct req_capsule *fti_pill;
         __u64               fti_key;
 struct fld_thread_info {
         struct req_capsule *fti_pill;
         __u64               fti_key;
-        __u64               fti_rec;
-        __u32               fti_flags;
+        struct lu_seq_range fti_rec;
+        struct lu_seq_range fti_lrange;
+        struct lu_seq_range fti_irange;
+        struct txn_param    fti_txn_param;
 };
 
 };
 
+
+struct thandle* fld_trans_start(struct lu_server_fld *fld,
+                                const struct lu_env *env, int credit);
+
+void fld_trans_stop(struct lu_server_fld *fld,
+                    const struct lu_env *env, struct thandle* th);
+
 int fld_index_init(struct lu_server_fld *fld,
                    const struct lu_env *env,
                    struct dt_device *dt);
 int fld_index_init(struct lu_server_fld *fld,
                    const struct lu_env *env,
                    struct dt_device *dt);
@@ -104,15 +165,20 @@ void fld_index_fini(struct lu_server_fld *fld,
 
 int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_env *env,
 
 int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq, mdsno_t mds);
+                     const struct lu_seq_range *range,
+                     struct thandle *th);
 
 int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_env *env,
 
 int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq);
+                     struct lu_seq_range *range,
+                     struct thandle *th);
 
 int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_env *env,
 
 int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_env *env,
-                     seqno_t seq, mdsno_t *mds);
+                     seqno_t seq, struct lu_seq_range *range);
+
+int fld_client_rpc(struct obd_export *exp,
+                   struct lu_seq_range *range, __u32 fld_op);
 
 #ifdef LPROCFS
 extern struct lprocfs_vars fld_server_proc_list[];
 
 #ifdef LPROCFS
 extern struct lprocfs_vars fld_server_proc_list[];
@@ -121,6 +187,22 @@ extern struct lprocfs_vars fld_client_proc_list[];
 
 #endif
 
 
 #endif
 
+struct fld_cache *fld_cache_init(const char *name,
+                                 int cache_size, int cache_threshold);
+
+void fld_cache_fini(struct fld_cache *cache);
+
+void fld_cache_flush(struct fld_cache *cache);
+
+void fld_cache_insert(struct fld_cache *cache,
+                      const struct lu_seq_range *range);
+
+void fld_cache_delete(struct fld_cache *cache,
+                      const struct lu_seq_range *range);
+
+int fld_cache_lookup(struct fld_cache *cache,
+                     const seqno_t seq, struct lu_seq_range *range);
+
 static inline const char *
 fld_target_name(struct lu_fld_target *tar)
 {
 static inline const char *
 fld_target_name(struct lu_fld_target *tar)
 {
index 39fb13b..dff5498 100644 (file)
@@ -164,26 +164,7 @@ fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
         RETURN(NULL);
 }
 
         RETURN(NULL);
 }
 
-static int fld_dht_hash(struct lu_client_fld *fld,
-                        seqno_t seq)
-{
-        /* XXX: here should be DHT hash */
-        return fld_rrb_hash(fld, seq);
-}
-
-static struct lu_fld_target *
-fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
-{
-        /* XXX: here should be DHT scan code */
-        return fld_rrb_scan(fld, seq);
-}
-
-struct lu_fld_hash fld_hash[3] = {
-        {
-                .fh_name = "DHT",
-                .fh_hash_func = fld_dht_hash,
-                .fh_scan_func = fld_dht_scan
-        },
+struct lu_fld_hash fld_hash[] = {
         {
                 .fh_name = "RRB",
                 .fh_hash_func = fld_rrb_hash,
         {
                 .fh_name = "RRB",
                 .fh_hash_func = fld_rrb_hash,
@@ -394,7 +375,6 @@ int fld_client_init(struct lu_client_fld *fld,
                 FLD_CLIENT_CACHE_THRESHOLD / 100;
 
         fld->lcf_cache = fld_cache_init(fld->lcf_name,
                 FLD_CLIENT_CACHE_THRESHOLD / 100;
 
         fld->lcf_cache = fld_cache_init(fld->lcf_name,
-                                        FLD_CLIENT_HTABLE_SIZE,
                                         cache_size, cache_threshold);
         if (IS_ERR(fld->lcf_cache)) {
                 rc = PTR_ERR(fld->lcf_cache);
                                         cache_size, cache_threshold);
         if (IS_ERR(fld->lcf_cache)) {
                 rc = PTR_ERR(fld->lcf_cache);
@@ -447,11 +427,11 @@ void fld_client_fini(struct lu_client_fld *fld)
 }
 EXPORT_SYMBOL(fld_client_fini);
 
 }
 EXPORT_SYMBOL(fld_client_fini);
 
-static int fld_client_rpc(struct obd_export *exp,
-                          struct md_fld *mf, __u32 fld_op)
+int fld_client_rpc(struct obd_export *exp,
+                   struct lu_seq_range *range, __u32 fld_op)
 {
         struct ptlrpc_request *req;
 {
         struct ptlrpc_request *req;
-        struct md_fld         *pmf;
+        struct lu_seq_range      *prange;
         __u32                 *op;
         int                    rc;
         ENTRY;
         __u32                 *op;
         int                    rc;
         ENTRY;
@@ -466,8 +446,8 @@ static int fld_client_rpc(struct obd_export *exp,
         op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
         *op = fld_op;
 
         op = req_capsule_client_get(&req->rq_pill, &RMF_FLD_OPC);
         *op = fld_op;
 
-        pmf = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
-        *pmf = *mf;
+        prange = req_capsule_client_get(&req->rq_pill, &RMF_FLD_MDFLD);
+        *prange = *range;
 
         ptlrpc_request_set_replen(req);
         req->rq_request_portal = FLD_REQUEST_PORTAL;
 
         ptlrpc_request_set_replen(req);
         req->rq_request_portal = FLD_REQUEST_PORTAL;
@@ -483,110 +463,32 @@ static int fld_client_rpc(struct obd_export *exp,
         if (rc)
                 GOTO(out_req, rc);
 
         if (rc)
                 GOTO(out_req, rc);
 
-        pmf = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
-        if (pmf == NULL)
+        prange = req_capsule_server_get(&req->rq_pill, &RMF_FLD_MDFLD);
+        if (prange == NULL)
                 GOTO(out_req, rc = -EFAULT);
                 GOTO(out_req, rc = -EFAULT);
-        *mf = *pmf;
+        *range = *prange;
         EXIT;
 out_req:
         ptlrpc_req_finished(req);
         return rc;
 }
 
         EXIT;
 out_req:
         ptlrpc_req_finished(req);
         return rc;
 }
 
-int fld_client_create(struct lu_client_fld *fld,
-                      seqno_t seq, mdsno_t mds,
-                      const struct lu_env *env)
-{
-        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
-        struct lu_fld_target *target;
-        int rc;
-        ENTRY;
-
-        fld->lcf_flags |= LUSTRE_FLD_RUN;
-        target = fld_client_get_target(fld, seq);
-        LASSERT(target != NULL);
-
-        CDEBUG(D_INFO, "%s: Create fld entry (seq: "LPX64"; mds: "
-               LPU64") on target %s (idx "LPU64")\n", fld->lcf_name,
-               seq, mds, fld_target_name(target), target->ft_idx);
-
-#ifdef __KERNEL__
-        if (target->ft_srv != NULL) {
-                LASSERT(env != NULL);
-                rc = fld_server_create(target->ft_srv, env, seq, mds);
-        } else {
-#endif
-                rc = fld_client_rpc(target->ft_exp, &md_fld, FLD_CREATE);
-#ifdef __KERNEL__
-        }
-#endif
-
-        if (rc == 0) {
-                /*
-                 * Do not return result of calling fld_cache_insert()
-                 * here. First of all because it may return -EEXIST. Another
-                 * reason is that, we do not want to stop proceeding because of
-                 * cache errors.
-                 */
-                fld_cache_insert(fld->lcf_cache, seq, mds);
-        } else {
-                CERROR("%s: Can't create FLD entry, rc %d\n",
-                       fld->lcf_name, rc);
-        }
-
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_create);
-
-int fld_client_delete(struct lu_client_fld *fld, seqno_t seq,
-                      const struct lu_env *env)
-{
-        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
-        struct lu_fld_target *target;
-        int rc;
-        ENTRY;
-
-        fld->lcf_flags |= LUSTRE_FLD_RUN;
-        fld_cache_delete(fld->lcf_cache, seq);
-
-        target = fld_client_get_target(fld, seq);
-        LASSERT(target != NULL);
-
-        CDEBUG(D_INFO, "%s: Delete fld entry (seq: "LPX64") on "
-               "target %s (idx "LPU64")\n", fld->lcf_name, seq,
-               fld_target_name(target), target->ft_idx);
-
-#ifdef __KERNEL__
-        if (target->ft_srv != NULL) {
-                LASSERT(env != NULL);
-                rc = fld_server_delete(target->ft_srv,
-                                       env, seq);
-        } else {
-#endif
-                rc = fld_client_rpc(target->ft_exp,
-                                    &md_fld, FLD_DELETE);
-#ifdef __KERNEL__
-        }
-#endif
-
-        RETURN(rc);
-}
-EXPORT_SYMBOL(fld_client_delete);
-
 int fld_client_lookup(struct lu_client_fld *fld,
                       seqno_t seq, mdsno_t *mds,
                       const struct lu_env *env)
 {
 int fld_client_lookup(struct lu_client_fld *fld,
                       seqno_t seq, mdsno_t *mds,
                       const struct lu_env *env)
 {
-        struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
+        struct lu_seq_range res;
         struct lu_fld_target *target;
         int rc;
         ENTRY;
 
         fld->lcf_flags |= LUSTRE_FLD_RUN;
 
         struct lu_fld_target *target;
         int rc;
         ENTRY;
 
         fld->lcf_flags |= LUSTRE_FLD_RUN;
 
-        rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
-        if (rc == 0)
+        rc = fld_cache_lookup(fld->lcf_cache, seq, &res);
+        if (rc == 0) {
+                *mds = res.lsr_mdt;
                 RETURN(0);
                 RETURN(0);
+        }
 
         /* Can not find it in the cache */
         target = fld_client_get_target(fld, seq);
 
         /* Can not find it in the cache */
         target = fld_client_get_target(fld, seq);
@@ -596,45 +498,24 @@ int fld_client_lookup(struct lu_client_fld *fld,
                "target %s (idx "LPU64")\n", fld->lcf_name, seq,
                fld_target_name(target), target->ft_idx);
 
                "target %s (idx "LPU64")\n", fld->lcf_name, seq,
                fld_target_name(target), target->ft_idx);
 
+        res.lsr_start = seq;
 #ifdef __KERNEL__
         if (target->ft_srv != NULL) {
                 LASSERT(env != NULL);
                 rc = fld_server_lookup(target->ft_srv,
 #ifdef __KERNEL__
         if (target->ft_srv != NULL) {
                 LASSERT(env != NULL);
                 rc = fld_server_lookup(target->ft_srv,
-                                       env, seq, &md_fld.mf_mds);
+                                       env, seq, &res);
         } else {
 #endif
         } else {
 #endif
-                /*
-                 * insert the 'inflight' sequence. No need to protect that,
-                 * we are trying to reduce numbers of RPC but not restrict
-                 * to them exactly one
-                 */
-                fld_cache_insert_inflight(fld->lcf_cache, seq);
                 rc = fld_client_rpc(target->ft_exp,
                 rc = fld_client_rpc(target->ft_exp,
-                                    &md_fld, FLD_LOOKUP);
+                                    &res, FLD_LOOKUP);
 #ifdef __KERNEL__
         }
 #endif
 #ifdef __KERNEL__
         }
 #endif
-        if (seq < FID_SEQ_START) {
-                /*
-                 * The current solution for IGIF is to bind it to mds0.
-                 * In the future, this should be fixed once IGIF can be found
-                 * in FLD.
-                 */
-                md_fld.mf_mds = 0;
-                rc = 0;
-        }
 
         if (rc == 0) {
 
         if (rc == 0) {
-                *mds = md_fld.mf_mds;
+                *mds = res.lsr_mdt;
 
 
-                /*
-                 * Do not return error here as well. See previous comment in
-                 * same situation in function fld_client_create().
-                 */
-                fld_cache_insert(fld->lcf_cache, seq, *mds);
-        } else {
-                /* remove 'inflight' seq if it exists */
-                fld_cache_delete(fld->lcf_cache, seq);
+                fld_cache_insert(fld->lcf_cache, &res);
         }
         RETURN(rc);
 }
         }
         RETURN(rc);
 }
index 59fcca8..48bbc4b 100644 (file)
 #define LUSTRE_LOG_VERSION  0x00050000
 #define LUSTRE_MGS_VERSION  0x00060000
 
 #define LUSTRE_LOG_VERSION  0x00050000
 #define LUSTRE_MGS_VERSION  0x00060000
 
-typedef __u64 mdsno_t;
+typedef __u32 mdsno_t;
 typedef __u64 seqno_t;
 
 typedef __u64 seqno_t;
 
-struct lu_range {
-        __u64 lr_start;
-        __u64 lr_end;
-        /** stub for compact fld work. */
-        __u64 lr_padding;
+/**
+ * Describes a range of sequence, lsr_start is included but lsr_end is
+ * not in the range.
+ * Same structure is used in fld module where lsr_mdt field holds mdt id
+ * of the home mdt.
+ */
+
+struct lu_seq_range {
+        __u64 lsr_start;
+        __u64 lsr_end;
+        __u32 lsr_mdt;
+        __u32 lsr_padding;
 };
 
 /**
  * returns  width of given range \a r
  */
 
 };
 
 /**
  * returns  width of given range \a r
  */
 
-static inline __u64 range_space(const struct lu_range *range)
+static inline __u64 range_space(const struct lu_seq_range *range)
 {
 {
-        return range->lr_end - range->lr_start;
+        return range->lsr_end - range->lsr_start;
 }
 
 /**
  * initialize range to zero
  */
 }
 
 /**
  * initialize range to zero
  */
-static inline void range_init(struct lu_range *range)
+
+static inline void range_init(struct lu_seq_range *range)
 {
 {
-        range->lr_start = range->lr_end = 0;
+        range->lsr_start = range->lsr_end = range->lsr_mdt = 0;
 }
 
 /**
  * check if given seq id \a s is within given range \a r
  */
 }
 
 /**
  * check if given seq id \a s is within given range \a r
  */
-static inline int range_within(struct lu_range *range,
+
+static inline int range_within(const struct lu_seq_range *range,
                                __u64 s)
 {
                                __u64 s)
 {
-        return s >= range->lr_start && s < range->lr_end;
+        return s >= range->lsr_start && s < range->lsr_end;
 }
 
 /**
  * allocate \a w units of sequence from range \a from.
  */
 }
 
 /**
  * allocate \a w units of sequence from range \a from.
  */
-static inline void range_alloc(struct lu_range *to,
-                               struct lu_range *from,
+static inline void range_alloc(struct lu_seq_range *to,
+                               struct lu_seq_range *from,
                                __u64 width)
 {
                                __u64 width)
 {
-        to->lr_start = from->lr_start;
-        to->lr_end = from->lr_start + width;
-        from->lr_start += width;
+        to->lsr_start = from->lsr_start;
+        to->lsr_end = from->lsr_start + width;
+        from->lsr_start += width;
 }
 
 }
 
-static inline int range_is_sane(const struct lu_range *range)
+static inline int range_is_sane(const struct lu_seq_range *range)
 {
 {
-        return (range->lr_end >= range->lr_start);
+        return (range->lsr_end >= range->lsr_start);
 }
 
 }
 
-static inline int range_is_zero(const struct lu_range *range)
+static inline int range_is_zero(const struct lu_seq_range *range)
 {
 {
-        return (range->lr_start == 0 && range->lr_end == 0);
+        return (range->lsr_start == 0 && range->lsr_end == 0);
 }
 
 }
 
-static inline int range_is_exhausted(const struct lu_range *range)
+static inline int range_is_exhausted(const struct lu_seq_range *range)
+
 {
         return range_space(range) == 0;
 }
 
 {
         return range_space(range) == 0;
 }
 
-#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x]"
+#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x):%x"
 
 #define PRANGE(range)      \
 
 #define PRANGE(range)      \
-        (range)->lr_start, \
-        (range)->lr_end
+        (range)->lsr_start, \
+        (range)->lsr_end,    \
+        (range)->lsr_mdt
 
 /** \defgroup lu_fid lu_fid
  * @{ */
 
 /** \defgroup lu_fid lu_fid
  * @{ */
@@ -443,7 +454,7 @@ static inline int fid_is_zero(const struct lu_fid *fid)
 }
 
 extern void lustre_swab_lu_fid(struct lu_fid *fid);
 }
 
 extern void lustre_swab_lu_fid(struct lu_fid *fid);
-extern void lustre_swab_lu_range(struct lu_range *range);
+extern void lustre_swab_lu_seq_range(struct lu_seq_range *range);
 
 static inline int lu_fid_eq(const struct lu_fid *f0,
                             const struct lu_fid *f1)
 
 static inline int lu_fid_eq(const struct lu_fid *f0,
                             const struct lu_fid *f1)
@@ -1745,13 +1756,6 @@ struct lmv_desc {
 
 extern void lustre_swab_lmv_desc (struct lmv_desc *ld);
 
 
 extern void lustre_swab_lmv_desc (struct lmv_desc *ld);
 
-struct md_fld {
-        seqno_t mf_seq;
-        mdsno_t mf_mds;
-};
-
-extern void lustre_swab_md_fld (struct md_fld *mf);
-
 enum fld_rpc_opc {
         FLD_QUERY                       = 600,
         FLD_LAST_OPC,
 enum fld_rpc_opc {
         FLD_QUERY                       = 600,
         FLD_LAST_OPC,
index 470feae..7c8085f 100644 (file)
@@ -54,8 +54,8 @@ struct lu_site;
 struct lu_context;
 
 /* Whole sequences space range and zero range definitions */
 struct lu_context;
 
 /* Whole sequences space range and zero range definitions */
-extern const struct lu_range LUSTRE_SEQ_SPACE_RANGE;
-extern const struct lu_range LUSTRE_SEQ_ZERO_RANGE;
+extern const struct lu_seq_range LUSTRE_SEQ_SPACE_RANGE;
+extern const struct lu_seq_range LUSTRE_SEQ_ZERO_RANGE;
 extern const struct lu_fid LUSTRE_BFL_FID;
 
 enum {
 extern const struct lu_fid LUSTRE_BFL_FID;
 
 enum {
@@ -63,7 +63,7 @@ enum {
          * This is how may FIDs may be allocated in one sequence. 16384 for
          * now.
          */
          * This is how may FIDs may be allocated in one sequence. 16384 for
          * now.
          */
-        LUSTRE_SEQ_MAX_WIDTH = 0x0000000000004000ULL,
+        LUSTRE_SEQ_MAX_WIDTH = 0x0000000000000400ULL,
 
         /*
          * How many sequences may be allocate for meta-sequence (this is 128
 
         /*
          * How many sequences may be allocate for meta-sequence (this is 128
@@ -134,7 +134,7 @@ struct lu_client_seq {
          * clients, this contains meta-sequence range. And for servers this
          * contains super-sequence range.
          */
          * clients, this contains meta-sequence range. And for servers this
          * contains super-sequence range.
          */
-        struct lu_range         lcs_space;
+        struct lu_seq_range         lcs_space;
 
         /* Seq related proc */
         cfs_proc_dir_entry_t   *lcs_proc_dir;
 
         /* Seq related proc */
         cfs_proc_dir_entry_t   *lcs_proc_dir;
@@ -164,7 +164,7 @@ struct lu_client_seq {
 /* server sequence manager interface */
 struct lu_server_seq {
         /* Available sequences space */
 /* server sequence manager interface */
 struct lu_server_seq {
         /* Available sequences space */
-        struct lu_range         lss_space;
+        struct lu_seq_range         lss_space;
 
         /*
          * Device for server side seq manager needs (saving sequences to backing
 
         /*
          * Device for server side seq manager needs (saving sequences to backing
@@ -198,6 +198,11 @@ struct lu_server_seq {
          * LUSTRE_SEQ_SUPER_WIDTH and LUSTRE_SEQ_META_WIDTH.
          */
         __u64                   lss_width;
          * LUSTRE_SEQ_SUPER_WIDTH and LUSTRE_SEQ_META_WIDTH.
          */
         __u64                   lss_width;
+
+        /**
+         * Pointer to site object, required to access site fld.
+         */
+        struct md_site         *lss_site;
 };
 
 int seq_query(struct com_thread_info *info);
 };
 
 int seq_query(struct com_thread_info *info);
@@ -207,19 +212,20 @@ int seq_server_init(struct lu_server_seq *seq,
                     struct dt_device *dev,
                     const char *prefix,
                     enum lu_mgr_type type,
                     struct dt_device *dev,
                     const char *prefix,
                     enum lu_mgr_type type,
+                    struct md_site *ls,
                     const struct lu_env *env);
 
 void seq_server_fini(struct lu_server_seq *seq,
                      const struct lu_env *env);
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
                     const struct lu_env *env);
 
 void seq_server_fini(struct lu_server_seq *seq,
                      const struct lu_env *env);
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
-                           struct lu_range *in,
-                           struct lu_range *out,
+                           struct lu_seq_range *in,
+                           struct lu_seq_range *out,
                            const struct lu_env *env);
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
                            const struct lu_env *env);
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
-                          struct lu_range *in,
-                          struct lu_range *out,
+                          struct lu_seq_range *in,
+                          struct lu_seq_range *out,
                           const struct lu_env *env);
 
 int seq_server_set_cli(struct lu_server_seq *seq,
                           const struct lu_env *env);
 
 int seq_server_set_cli(struct lu_server_seq *seq,
@@ -241,7 +247,8 @@ int seq_client_alloc_fid(struct lu_client_seq *seq,
                          struct lu_fid *fid);
 
 /* Fids common stuff */
                          struct lu_fid *fid);
 
 /* Fids common stuff */
-int fid_is_local(struct lu_site *site, const struct lu_fid *fid);
+int fid_is_local(const struct lu_env *env,
+                 struct lu_site *site, const struct lu_fid *fid);
 
 /* fid locking */
 
 
 /* fid locking */
 
@@ -300,9 +307,32 @@ static inline __u64 fid_flatten(const struct lu_fid *fid)
 #define LUSTRE_SEQ_CTL_NAME "seq_ctl"
 
 /* Range common stuff */
 #define LUSTRE_SEQ_CTL_NAME "seq_ctl"
 
 /* Range common stuff */
-void range_cpu_to_le(struct lu_range *dst, const struct lu_range *src);
-void range_cpu_to_be(struct lu_range *dst, const struct lu_range *src);
-void range_le_to_cpu(struct lu_range *dst, const struct lu_range *src);
-void range_be_to_cpu(struct lu_range *dst, const struct lu_range *src);
+static inline void range_cpu_to_le(struct lu_seq_range *dst, const struct lu_seq_range *src)
+{
+        dst->lsr_start = cpu_to_le64(src->lsr_start);
+        dst->lsr_end = cpu_to_le64(src->lsr_end);
+        dst->lsr_mdt = cpu_to_le32(src->lsr_mdt);
+}
+
+static inline void range_le_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
+{
+        dst->lsr_start = le64_to_cpu(src->lsr_start);
+        dst->lsr_end = le64_to_cpu(src->lsr_end);
+        dst->lsr_mdt = le32_to_cpu(src->lsr_mdt);
+}
+
+static inline void range_cpu_to_be(struct lu_seq_range *dst, const struct lu_seq_range *src)
+{
+        dst->lsr_start = cpu_to_be64(src->lsr_start);
+        dst->lsr_end = cpu_to_be64(src->lsr_end);
+        dst->lsr_mdt = cpu_to_be32(src->lsr_mdt);
+}
+
+static inline void range_be_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
+{
+        dst->lsr_start = be64_to_cpu(src->lsr_start);
+        dst->lsr_end = be64_to_cpu(src->lsr_end);
+        dst->lsr_mdt = be32_to_cpu(src->lsr_mdt);
+}
 
 #endif /* __LINUX_FID_H */
 
 #endif /* __LINUX_FID_H */
index ec8be4f..ec65b99 100644 (file)
 
 struct lu_client_fld;
 struct lu_server_fld;
 
 struct lu_client_fld;
 struct lu_server_fld;
+struct lu_fld_hash;
+struct fld_cache;
 
 extern const struct dt_index_features fld_index_features;
 extern const char fld_index_name[];
 
 
 extern const struct dt_index_features fld_index_features;
 extern const char fld_index_name[];
 
-
-struct fld_stats {
-        __u64   fst_count;
-        __u64   fst_cache;
-        __u64   fst_inflight;
-};
-
 /*
  * FLD (Fid Location Database) interface.
  */
 /*
  * FLD (Fid Location Database) interface.
  */
@@ -64,7 +59,6 @@ enum {
         LUSTRE_CLI_FLD_HASH_RRB
 };
 
         LUSTRE_CLI_FLD_HASH_RRB
 };
 
-struct lu_server_fld;
 
 struct lu_fld_target {
         struct list_head         ft_chain;
 
 struct lu_fld_target {
         struct list_head         ft_chain;
@@ -73,134 +67,101 @@ struct lu_fld_target {
         __u64                    ft_idx;
 };
 
         __u64                    ft_idx;
 };
 
-typedef int
-(*fld_hash_func_t) (struct lu_client_fld *, __u64);
-
-typedef struct lu_fld_target *
-(*fld_scan_func_t) (struct lu_client_fld *, __u64);
-
-struct lu_fld_hash {
-        const char              *fh_name;
-        fld_hash_func_t          fh_hash_func;
-        fld_scan_func_t          fh_scan_func;
-};
-
-struct fld_cache_entry {
-        struct hlist_node        fce_list;
-        struct list_head         fce_lru;
-        mdsno_t                  fce_mds;
-        seqno_t                  fce_seq;
-        cfs_waitq_t              fce_waitq;
-        __u32                    fce_inflight:1,
-                                 fce_invalid:1;
-};
-
-struct fld_cache {
-        /*
-         * Cache guard, protects fci_hash mostly because others immutable after
-         * init is finished.
-         */
-        spinlock_t               fci_lock;
-
-        /* Cache shrink threshold */
-        int                      fci_threshold;
-
-        /* Prefered number of cached entries */
-        int                      fci_cache_size;
-
-        /* Current number of cached entries. Protected by @fci_lock */
-        int                      fci_cache_count;
-
-        /* Hash table size (number of collision lists) */
-        int                      fci_hash_size;
-
-        /* Hash table mask */
-        int                      fci_hash_mask;
-
-        /* Hash table for all collision lists */
-        struct hlist_head       *fci_hash_table;
-
-        /* Lru list */
-        struct list_head         fci_lru;
-
-        /* Cache statistics. */
-        struct fld_stats         fci_stat;
-        
-        /* Cache name used for debug and messages. */
-        char                     fci_name[80];
-};
-
 struct lu_server_fld {
 struct lu_server_fld {
-        /* Fld dir proc entry. */
+        /**
+         * Fld dir proc entry. */
         cfs_proc_dir_entry_t    *lsf_proc_dir;
 
         cfs_proc_dir_entry_t    *lsf_proc_dir;
 
-        /* /fld file object device */
+        /**
+         * /fld file object device */
         struct dt_object        *lsf_obj;
 
         struct dt_object        *lsf_obj;
 
-        /* Client FLD cache. */
+        /**
+         * super sequence controller export, needed to forward fld
+         * lookup  request. */
+        struct obd_export       *lsf_control_exp;
+
+        /**
+         * Client FLD cache. */
         struct fld_cache        *lsf_cache;
 
         struct fld_cache        *lsf_cache;
 
-        /* Protect index modifications */
-        struct semaphore         lsf_sem;
+        /**
+         * Protect index modifications */
+        struct mutex            lsf_lock;
 
 
-        /* Fld service name in form "fld-srv-lustre-MDTXXX" */
+        /**
+         * Fld service name in form "fld-srv-lustre-MDTXXX" */
         char                     lsf_name[80];
 };
 
         char                     lsf_name[80];
 };
 
-enum {
-        LUSTRE_FLD_INIT = 1 << 0,
-        LUSTRE_FLD_RUN  = 1 << 1
-};
-
 struct lu_client_fld {
 struct lu_client_fld {
-        /* Client side proc entry. */
+        /**
+         * Client side proc entry. */
         cfs_proc_dir_entry_t    *lcf_proc_dir;
 
         cfs_proc_dir_entry_t    *lcf_proc_dir;
 
-        /* List of exports client FLD knows about. */
+        /**
+         * List of exports client FLD knows about. */
         struct list_head         lcf_targets;
 
         struct list_head         lcf_targets;
 
-        /* Current hash to be used to chose an export. */
+        /**
+         * Current hash to be used to chose an export. */
         struct lu_fld_hash      *lcf_hash;
 
         struct lu_fld_hash      *lcf_hash;
 
-        /* Exports count. */
+        /**
+         * Exports count. */
         int                      lcf_count;
 
         int                      lcf_count;
 
-        /* Lock protecting exports list and fld_hash. */
+        /**
+         * Lock protecting exports list and fld_hash. */
         spinlock_t               lcf_lock;
 
         spinlock_t               lcf_lock;
 
-        /* Client FLD cache. */
+        /**
+         * Client FLD cache. */
         struct fld_cache        *lcf_cache;
 
         struct fld_cache        *lcf_cache;
 
-        /* Client fld proc entry name. */
+        /**
+         * Client fld proc entry name. */
         char                     lcf_name[80];
 
         const struct lu_context *lcf_ctx;
         char                     lcf_name[80];
 
         const struct lu_context *lcf_ctx;
-        
+
         int                      lcf_flags;
 };
 
         int                      lcf_flags;
 };
 
+/**
+ * number of blocks to reserve for particular operations. Should be function of
+ * ... something. Stub for now.
+ */
+enum {
+        /* one insert operation can involve two delete and one insert */
+        FLD_TXN_INDEX_INSERT_CREDITS  = 60,
+        FLD_TXN_INDEX_DELETE_CREDITS  = 20,
+};
+
 int fld_query(struct com_thread_info *info);
 
 /* Server methods */
 int fld_server_init(struct lu_server_fld *fld,
                     struct dt_device *dt,
                     const char *prefix,
 int fld_query(struct com_thread_info *info);
 
 /* Server methods */
 int fld_server_init(struct lu_server_fld *fld,
                     struct dt_device *dt,
                     const char *prefix,
-                    const struct lu_env *env);
+                    const struct lu_env *env,
+                    int mds_node_id);
 
 void fld_server_fini(struct lu_server_fld *fld,
                      const struct lu_env *env);
 
 int fld_server_create(struct lu_server_fld *fld,
                       const struct lu_env *env,
 
 void fld_server_fini(struct lu_server_fld *fld,
                      const struct lu_env *env);
 
 int fld_server_create(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq, mdsno_t mds);
+                      struct lu_seq_range *add_range,
+                      struct thandle *th);
 
 int fld_server_delete(struct lu_server_fld *fld,
                       const struct lu_env *env,
 
 int fld_server_delete(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq);
+                      struct lu_seq_range *range);
 
 int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_env *env,
 
 int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_env *env,
-                      seqno_t seq, mdsno_t *mds);
+                      seqno_t seq, struct lu_seq_range *range);
 
 /* Client methods */
 int fld_client_init(struct lu_client_fld *fld,
 
 /* Client methods */
 int fld_client_init(struct lu_client_fld *fld,
@@ -215,7 +176,7 @@ int fld_client_lookup(struct lu_client_fld *fld,
                       const struct lu_env *env);
 
 int fld_client_create(struct lu_client_fld *fld,
                       const struct lu_env *env);
 
 int fld_client_create(struct lu_client_fld *fld,
-                      seqno_t seq, mdsno_t mds,
+                      struct lu_seq_range *range,
                       const struct lu_env *env);
 
 int fld_client_delete(struct lu_client_fld *fld,
                       const struct lu_env *env);
 
 int fld_client_delete(struct lu_client_fld *fld,
@@ -228,27 +189,4 @@ int fld_client_add_target(struct lu_client_fld *fld,
 int fld_client_del_target(struct lu_client_fld *fld,
                           __u64 idx);
 
 int fld_client_del_target(struct lu_client_fld *fld,
                           __u64 idx);
 
-/* Cache methods */
-struct fld_cache *fld_cache_init(const char *name,
-                                 int hash_size,
-                                 int cache_size,
-                                 int cache_threshold);
-
-void fld_cache_fini(struct fld_cache *cache);
-
-void fld_cache_flush(struct fld_cache *cache);
-
-int fld_cache_insert(struct fld_cache *cache,
-                     seqno_t seq, mdsno_t mds);
-
-int fld_cache_insert_inflight(struct fld_cache *cache,
-                              seqno_t seq);
-
-void fld_cache_delete(struct fld_cache *cache,
-                      seqno_t seq);
-
-int
-fld_cache_lookup(struct fld_cache *cache,
-                 seqno_t seq, mdsno_t *mds);
-
 #endif
 #endif
index c2b9757..8f4f94c 100644 (file)
@@ -75,11 +75,11 @@ int lmv_fld_lookup(struct lmv_obd *lmv,
                 RETURN(rc);
         }
         
                 RETURN(rc);
         }
         
-        CDEBUG(D_INODE, "FLD lookup got mds #"LPU64" for fid="DFID"\n",
+        CDEBUG(D_INODE, "FLD lookup got mds #%x for fid="DFID"\n",
                *mds, PFID(fid));
 
         if (*mds >= lmv->desc.ld_tgt_count) {
                *mds, PFID(fid));
 
         if (*mds >= lmv->desc.ld_tgt_count) {
-                CERROR("FLD lookup got invalid mds #"LPU64" (max: %d) "
+                CERROR("FLD lookup got invalid mds #%x (max: %x) "
                        "for fid="DFID"\n", *mds, lmv->desc.ld_tgt_count,
                        PFID(fid));
                 rc = -EINVAL;
                        "for fid="DFID"\n", *mds, lmv->desc.ld_tgt_count,
                        PFID(fid));
                 rc = -EINVAL;
index 7e35465..cb1273b 100644 (file)
@@ -768,7 +768,7 @@ int lmv_allocate_slaves(struct obd_device *obd, struct lu_fid *pid,
         }
 
         CDEBUG(D_INODE, "Allocate new fid "DFID" for slave "
         }
 
         CDEBUG(D_INODE, "Allocate new fid "DFID" for slave "
-               "obj -> mds #"LPU64"\n", PFID(fid), mds);
+               "obj -> mds #%x\n", PFID(fid), mds);
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
index 004218e..7543a8c 100644 (file)
@@ -978,20 +978,7 @@ int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid,
         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
         if (rc > 0) {
                 LASSERT(fid_is_sane(fid));
         rc = obd_fid_alloc(tgt->ltd_exp, fid, NULL);
         if (rc > 0) {
                 LASSERT(fid_is_sane(fid));
-
-                /*
-                 * Client switches to new sequence, setup FLD.
-                 */
-                rc = fld_client_create(&lmv->lmv_fld, fid_seq(fid),
-                                       mds, NULL);
-                if (rc) {
-                        /*
-                         * Delete just allocated fid sequence in case
-                         * of fail back.
-                         */
-                        CERROR("Can't create fld entry, rc %d\n", rc);
-                        obd_fid_delete(tgt->ltd_exp, NULL);
-                }
+                rc = 0;
         }
 
         EXIT;
         }
 
         EXIT;
@@ -1498,7 +1485,7 @@ repeat:
         else if (rc)
                 RETURN(rc);
 
         else if (rc)
                 RETURN(rc);
 
-        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #"LPU64"\n",
+        CDEBUG(D_INODE, "CREATE '%*s' on "DFID" -> mds #%x\n",
                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
                op_data->op_mds);
 
                op_data->op_namelen, op_data->op_name, PFID(&op_data->op_fid1),
                op_data->op_mds);
 
@@ -1970,7 +1957,7 @@ repeat:
                         RETURN(rc);
         }
 
                         RETURN(rc);
         }
 
-        CDEBUG(D_INODE, "Forward to mds #"LPU64" ("DFID")\n",
+        CDEBUG(D_INODE, "Forward to mds #%x ("DFID")\n",
                mds, PFID(&op_data->op_fid1));
 
         op_data->op_fsuid = current->fsuid;
                mds, PFID(&op_data->op_fid1));
 
         op_data->op_fsuid = current->fsuid;
index 1a5b7f7..c7932bf 100644 (file)
@@ -3373,7 +3373,7 @@ static void mdt_seq_adjust(const struct lu_env *env,
                           struct mdt_device *m, int lost)
 {
         struct md_site *ms = mdt_md_site(m);
                           struct mdt_device *m, int lost)
 {
         struct md_site *ms = mdt_md_site(m);
-        struct lu_range out;
+        struct lu_seq_range out;
         ENTRY;
 
         LASSERT(ms && ms->ms_server_seq);
         ENTRY;
 
         LASSERT(ms && ms->ms_server_seq);
@@ -3439,6 +3439,7 @@ static int mdt_seq_init(const struct lu_env *env,
                 rc = seq_server_init(ms->ms_control_seq,
                                      m->mdt_bottom, uuid,
                                      LUSTRE_SEQ_CONTROLLER,
                 rc = seq_server_init(ms->ms_control_seq,
                                      m->mdt_bottom, uuid,
                                      LUSTRE_SEQ_CONTROLLER,
+                                     ms,
                                      env);
 
                 if (rc)
                                      env);
 
                 if (rc)
@@ -3480,6 +3481,7 @@ static int mdt_seq_init(const struct lu_env *env,
         rc = seq_server_init(ms->ms_server_seq,
                              m->mdt_bottom, uuid,
                              LUSTRE_SEQ_SERVER,
         rc = seq_server_init(ms->ms_server_seq,
                              m->mdt_bottom, uuid,
                              LUSTRE_SEQ_SERVER,
+                             ms,
                              env);
         if (rc)
                 GOTO(out_seq_fini, rc = -ENOMEM);
                              env);
         if (rc)
                 GOTO(out_seq_fini, rc = -ENOMEM);
@@ -3634,7 +3636,8 @@ static int mdt_fld_init(const struct lu_env *env,
                 RETURN(rc = -ENOMEM);
 
         rc = fld_server_init(ms->ms_server_fld,
                 RETURN(rc = -ENOMEM);
 
         rc = fld_server_init(ms->ms_server_fld,
-                             m->mdt_bottom, uuid, env);
+                             m->mdt_bottom, uuid,
+                             env, ms->ms_node_id);
         if (rc) {
                 OBD_FREE_PTR(ms->ms_server_fld);
                 ms->ms_server_fld = NULL;
         if (rc) {
                 OBD_FREE_PTR(ms->ms_server_fld);
                 ms->ms_server_fld = NULL;
index df515ee..b76cca8 100644 (file)
@@ -107,12 +107,13 @@ void lustre_swab_lu_fid(struct lu_fid *fid)
 }
 EXPORT_SYMBOL(lustre_swab_lu_fid);
 
 }
 EXPORT_SYMBOL(lustre_swab_lu_fid);
 
-void lustre_swab_lu_range(struct lu_range *range)
+void lustre_swab_lu_seq_range(struct lu_seq_range *range)
 {
 {
-        __swab64s (&range->lr_start);
-        __swab64s (&range->lr_end);
+        __swab64s (&range->lsr_start);
+        __swab64s (&range->lsr_end);
+        __swab32s (&range->lsr_mdt);
 }
 }
-EXPORT_SYMBOL(lustre_swab_lu_range);
+EXPORT_SYMBOL(lustre_swab_lu_seq_range);
 
 void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail)
 {
 
 void lustre_swab_llog_rec(struct llog_rec_hdr *rec, struct llog_rec_tail *tail)
 {
index bd5bb5b..0d4b6be 100644 (file)
@@ -3609,7 +3609,7 @@ static int osd_fid_lookup(const struct lu_env *env,
          * fids. Unfortunately it is somewhat expensive (does a
          * cache-lookup). Disabling it for production/acceptance-testing.
          */
          * fids. Unfortunately it is somewhat expensive (does a
          * cache-lookup). Disabling it for production/acceptance-testing.
          */
-        LASSERT(1 || fid_is_local(ldev->ld_site, fid));
+        LASSERT(1 || fid_is_local(env, ldev->ld_site, fid));
 
         ENTRY;
 
 
         ENTRY;
 
index e663cea..764957e 100644 (file)
@@ -656,7 +656,7 @@ EXPORT_SYMBOL(RMF_SEQ_OPC);
 
 const struct req_msg_field RMF_SEQ_RANGE =
         DEFINE_MSGF("seq_query_range", 0,
 
 const struct req_msg_field RMF_SEQ_RANGE =
         DEFINE_MSGF("seq_query_range", 0,
-                    sizeof(struct lu_range), lustre_swab_lu_range);
+                    sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
 EXPORT_SYMBOL(RMF_SEQ_RANGE);
 
 const struct req_msg_field RMF_FLD_OPC =
 EXPORT_SYMBOL(RMF_SEQ_RANGE);
 
 const struct req_msg_field RMF_FLD_OPC =
@@ -666,7 +666,7 @@ EXPORT_SYMBOL(RMF_FLD_OPC);
 
 const struct req_msg_field RMF_FLD_MDFLD =
         DEFINE_MSGF("fld_query_mdfld", 0,
 
 const struct req_msg_field RMF_FLD_MDFLD =
         DEFINE_MSGF("fld_query_mdfld", 0,
-                    sizeof(struct md_fld), lustre_swab_md_fld);
+                    sizeof(struct lu_seq_range), lustre_swab_lu_seq_range);
 EXPORT_SYMBOL(RMF_FLD_MDFLD);
 
 const struct req_msg_field RMF_MDT_BODY =
 EXPORT_SYMBOL(RMF_FLD_MDFLD);
 
 const struct req_msg_field RMF_MDT_BODY =
index 8739098..1cd90f0 100644 (file)
@@ -1956,12 +1956,6 @@ void lustre_swab_lmv_desc (struct lmv_desc *ld)
         __swab32s (&ld->ld_active_tgt_count);
         /* uuid endian insensitive */
 }
         __swab32s (&ld->ld_active_tgt_count);
         /* uuid endian insensitive */
 }
-/*end adding MDT by huanghua@clusterfs.com*/
-void lustre_swab_md_fld (struct md_fld *mf)
-{
-        __swab64s(&mf->mf_seq);
-        __swab64s(&mf->mf_mds);
-}
 
 static void print_lum (struct lov_user_md *lum)
 {
 
 static void print_lum (struct lov_user_md *lum)
 {
index 03adc07..363f399 100644 (file)
@@ -298,7 +298,6 @@ EXPORT_SYMBOL(lustre_msg_set_transno);
 EXPORT_SYMBOL(lustre_msg_set_status);
 EXPORT_SYMBOL(lustre_msg_set_conn_cnt);
 EXPORT_SYMBOL(lustre_swab_mgs_target_info);
 EXPORT_SYMBOL(lustre_msg_set_status);
 EXPORT_SYMBOL(lustre_msg_set_conn_cnt);
 EXPORT_SYMBOL(lustre_swab_mgs_target_info);
-EXPORT_SYMBOL(lustre_swab_md_fld);
 EXPORT_SYMBOL(lustre_swab_generic_32s);
 EXPORT_SYMBOL(lustre_swab_lustre_capa);
 EXPORT_SYMBOL(lustre_swab_lustre_capa_key);
 EXPORT_SYMBOL(lustre_swab_generic_32s);
 EXPORT_SYMBOL(lustre_swab_lustre_capa);
 EXPORT_SYMBOL(lustre_swab_lustre_capa_key);
index dd596d6..617f2bc 100644 (file)
@@ -235,8 +235,8 @@ load_modules() {
     load_module ptlrpc/ptlrpc
     load_module ptlrpc/gss/ptlrpc_gss
     [ "$USE_QUOTA" = "yes" -a "$LQUOTA" != "no" ] && load_module quota/lquota
     load_module ptlrpc/ptlrpc
     load_module ptlrpc/gss/ptlrpc_gss
     [ "$USE_QUOTA" = "yes" -a "$LQUOTA" != "no" ] && load_module quota/lquota
-    load_module fid/fid
     load_module fld/fld
     load_module fld/fld
+    load_module fid/fid
     load_module lmv/lmv
     load_module mdc/mdc
     load_module osc/osc
     load_module lmv/lmv
     load_module mdc/mdc
     load_module osc/osc
index 2611093..e5fd0f8 100644 (file)
@@ -50,7 +50,7 @@
 #define __REQ_LAYOUT_USER__ (1)
 
 #define lustre_swab_generic_32s NULL
 #define __REQ_LAYOUT_USER__ (1)
 
 #define lustre_swab_generic_32s NULL
-#define lustre_swab_lu_range NULL
+#define lustre_swab_lu_seq_range NULL
 #define lustre_swab_md_fld NULL
 #define lustre_swab_mdt_body NULL
 #define lustre_swab_mdt_epoch NULL
 #define lustre_swab_md_fld NULL
 #define lustre_swab_mdt_body NULL
 #define lustre_swab_mdt_epoch NULL
index 02e5746..f881c82 100644 (file)
@@ -2396,5 +2396,18 @@ void lustre_assert_wire_constants(void)
         LASSERTF((int)sizeof(((xattr_acl_header *)0)->a_entries) == 0, " found %lld\n",
                  (long long)(int)sizeof(((xattr_acl_header *)0)->a_entries));
 #endif
         LASSERTF((int)sizeof(((xattr_acl_header *)0)->a_entries) == 0, " found %lld\n",
                  (long long)(int)sizeof(((xattr_acl_header *)0)->a_entries));
 #endif
+
+        /* check fid range */
+        LASSERTF((int)sizeof(struct lu_seq_range) == 24, " found %lld\n",
+                 (long long)(int)sizeof(struct lu_seq_range));
+        LASSERTF((int)offsetof(struct lu_seq_range, lsr_start) == 0, " found %lld\n",
+                 (long long)(int)offsetof(struct lu_seq_range, lsr_start));
+        LASSERTF((int)offsetof(struct lu_seq_range, lsr_end) == 8, " found %lld\n",
+                 (long long)(int)offsetof(struct lu_seq_range, lsr_end));
+        LASSERTF((int)offsetof(struct lu_seq_range, lsr_mdt) == 16, " found %lld\n",
+                 (long long)(int)offsetof(struct lu_seq_range, lsr_mdt));
+        LASSERTF((int)offsetof(struct lu_seq_range, lsr_padding) == 20, " found %lld\n",
+                 (long long)(int)offsetof(struct lu_seq_range, lsr_padding));
+
 }
 
 }