Whamcloud - gitweb
- disable mds local connections, that is on mdt0 there will not be mdc0;
authoryury <yury>
Wed, 6 Sep 2006 10:32:41 +0000 (10:32 +0000)
committeryury <yury>
Wed, 6 Sep 2006 10:32:41 +0000 (10:32 +0000)
- update fld and seq mgr to not use local connections;
- for seq-mgr use different files for storing data of controller an regular seq server, fixing this in mkfs too;
- cleanups and fixes in fld and seq with names of fields;

18 files changed:
lustre/cmm/cmm_device.c
lustre/fid/fid_handler.c
lustre/fid/fid_internal.h
lustre/fid/fid_request.c
lustre/fid/fid_store.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_internal.h
lustre/fld/fld_request.c
lustre/fld/lproc_fld.c
lustre/include/lu_object.h
lustre/include/lustre_fid.h
lustre/include/lustre_fld.h
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/mgs/mgs_llog.c
lustre/utils/mkfs_lustre.c

index eb6875c..f724c3f 100644 (file)
@@ -98,11 +98,12 @@ extern struct lu_device_type mdc_device_type;
 static int cmm_add_mdc(const struct lu_context *ctx,
                        struct cmm_device *cm, struct lustre_cfg *cfg)
 {
-        struct  lu_device_type *ldt = &mdc_device_type;
-        struct  lu_device *ld;
-        struct lu_site *ls;
-        struct  mdc_device *mc, *tmp;
+        struct lu_device_type *ldt = &mdc_device_type;
         char *p, *num = lustre_cfg_string(cfg, 2);
+        struct mdc_device *mc, *tmp;
+        struct lu_fld_target target;
+        struct lu_device *ld;
+        struct lu_site *ls;
         mdsno_t mdc_num;
         int rc;
         ENTRY;
@@ -153,7 +154,12 @@ static int cmm_add_mdc(const struct lu_context *ctx,
                 lu_device_get(cmm2lu_dev(cm));
 
                 ls = cm->cmm_md_dev.md_lu_dev.ld_site;
-                fld_client_add_target(ls->ls_client_fld, mc->mc_desc.cl_exp);
+
+                target.ft_srv = NULL;
+                target.ft_idx = mc->mc_num;
+                target.ft_exp = mc->mc_desc.cl_exp;
+        
+                fld_client_add_target(ls->ls_client_fld, &target);
         }
         RETURN(rc);
 }
@@ -162,16 +168,8 @@ static void cmm_device_shutdown(const struct lu_context *ctx,
                                 struct cmm_device *cm)
 {
         struct mdc_device *mc, *tmp;
-        struct lu_site *ls;
         ENTRY;
 
-        ls = cm->cmm_md_dev.md_lu_dev.ld_site;
-        if (ls->ls_client_fld != NULL) {
-                fld_client_fini(ls->ls_client_fld);
-                OBD_FREE_PTR(ls->ls_client_fld);
-                ls->ls_client_fld = NULL;
-        }
-
         /* finish all mdc devices */
         spin_lock(&cm->cmm_tgt_guard);
         list_for_each_entry_safe(mc, tmp, &cm->cmm_targets, mc_linkage) {
@@ -191,11 +189,9 @@ static int cmm_device_mount(const struct lu_context *ctx,
                             struct cmm_device *m, struct lustre_cfg *cfg)
 {
         const char *index = lustre_cfg_string(cfg, 2);
-        struct lu_site *ls;
-        char *p, *name_str;
-        int rc;
+        char *p;
         
-        LASSERT(index);
+        LASSERT(index != NULL);
 
         m->cmm_local_num = simple_strtol(index, &p, 10);
         if (*p) {
@@ -203,18 +199,7 @@ static int cmm_device_mount(const struct lu_context *ctx,
                 RETURN(-EINVAL);
         }
         
-        ls = m->cmm_md_dev.md_lu_dev.ld_site;
-        OBD_ALLOC_PTR(ls->ls_client_fld);
-        if (!ls->ls_client_fld)
-                RETURN(-ENOMEM);
-
-        name_str = lustre_cfg_string(cfg, 0);
-        rc = fld_client_init(ls->ls_client_fld, name_str,
-                             LUSTRE_CLI_FLD_HASH_DHT);
-        if (rc) {
-                CERROR("can't init FLD, err %d\n",  rc);        
-        }
-        RETURN(rc);
+        RETURN(0);
 }
 
 static int cmm_process_config(const struct lu_context *ctx,
index 518e369..7a04805 100644 (file)
@@ -76,8 +76,8 @@ int seq_server_set_cli(struct lu_server_seq *seq,
 
         if (cli == NULL) {
                 CDEBUG(D_INFO|D_WARNING, "%s: detached "
-                       "sequence mgr client %s\n", seq->lss_name,
-                       cli->lcs_exp->exp_client_uuid.uuid);
+                       "sequence mgr client %s\n",
+                       seq->lss_name, cli->lcs_name);
                 seq->lss_cli = cli;
                 RETURN(0);
         }
@@ -90,7 +90,7 @@ int seq_server_set_cli(struct lu_server_seq *seq,
 
         CDEBUG(D_INFO|D_WARNING, "%s: attached "
                "sequence client %s\n", seq->lss_name,
-               cli->lcs_exp->exp_client_uuid.uuid);
+               cli->lcs_name);
 
         /* asking client for new range, assign that range to ->seq_super and
          * write seq state to backing store should be atomic. */
@@ -104,9 +104,10 @@ int seq_server_set_cli(struct lu_server_seq *seq,
         if (range_is_zero(&seq->lss_super)) {
                 rc = seq_client_alloc_super(cli);
                 if (rc) {
+                        up(&seq->lss_sem);
                         CERROR("can't allocate super-sequence, "
                                "rc %d\n", rc);
-                        GOTO(out_up, rc);
+                        RETURN(rc);
                 }
 
                 /* take super-seq from client seq mgr */
@@ -122,10 +123,8 @@ int seq_server_set_cli(struct lu_server_seq *seq,
                 }
         }
 
-        EXIT;
-out_up:
         up(&seq->lss_sem);
-        return rc;
+        RETURN(rc);
 }
 EXPORT_SYMBOL(seq_server_set_cli);
 
@@ -172,9 +171,9 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
         RETURN(rc);
 }
 
-static int seq_server_alloc_super(struct lu_server_seq *seq,
-                                  struct lu_range *range,
-                                  const struct lu_context *ctx)
+int seq_server_alloc_super(struct lu_server_seq *seq,
+                           struct lu_range *range,
+                           const struct lu_context *ctx)
 {
         int rc;
         ENTRY;
@@ -231,9 +230,9 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
         RETURN(rc);
 }
 
-static int seq_server_alloc_meta(struct lu_server_seq *seq,
-                                 struct lu_range *range,
-                                 const struct lu_context *ctx)
+int seq_server_alloc_meta(struct lu_server_seq *seq,
+                          struct lu_range *range,
+                          const struct lu_context *ctx)
 {
         int rc;
         ENTRY;
@@ -485,13 +484,13 @@ static void seq_server_proc_fini(struct lu_server_seq *seq)
 
 int seq_server_init(struct lu_server_seq *seq,
                     struct dt_device *dev,
-                    const char *uuid,
+                    const char *prefix,
                     enum lu_mgr_type type,
                     const struct lu_context *ctx)
 {
         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
         
-        static  struct ptlrpc_service_conf seq_conf;
+        static struct ptlrpc_service_conf seq_conf;
         seq_conf = (typeof(seq_conf)) {
                 .psc_nbufs = MDS_NBUFS,
                 .psc_bufsize = MDS_BUFSIZE,
@@ -509,7 +508,7 @@ int seq_server_init(struct lu_server_seq *seq,
         ENTRY;
 
        LASSERT(dev != NULL);
-        LASSERT(uuid != NULL);
+        LASSERT(prefix != NULL);
 
         seq->lss_cli = NULL;
         seq->lss_type = type;
@@ -520,7 +519,7 @@ int seq_server_init(struct lu_server_seq *seq,
 
         snprintf(seq->lss_name, sizeof(seq->lss_name), "%s-%s-%s",
                  LUSTRE_SEQ_NAME, (is_srv ? "srv" : "ctl"),
-                 uuid);
+                 prefix);
 
         seq->lss_space = LUSTRE_SEQ_SPACE_RANGE;
         seq->lss_super = LUSTRE_SEQ_ZERO_RANGE;
index 556610f..a18aa0e 100644 (file)
 
 #ifdef __KERNEL__
 struct seq_store_record {
-        struct lu_range srv_space;
-        struct lu_range srv_super;
-        struct lu_range ctl_space;
-        struct lu_range ctl_super;
+        struct lu_range ssr_space;
+        struct lu_range ssr_super;
 };
 
 struct seq_thread_info {
index fcadda0..8ce5f3c 100644 (file)
@@ -122,8 +122,21 @@ out_req:
 /* request sequence-controller node to allocate new super-sequence. */
 static int __seq_client_alloc_super(struct lu_client_seq *seq)
 {
-        return seq_client_rpc(seq, &seq->lcs_range,
-                              SEQ_ALLOC_SUPER, "super");
+        int rc;
+        
+#ifdef __KERNEL__
+        if (seq->lcs_srv) {
+                rc = seq_server_alloc_super(seq->lcs_srv,
+                                            &seq->lcs_range,
+                                            seq->lcs_ctx);
+        } else {
+#endif
+                rc = seq_client_rpc(seq, &seq->lcs_range,
+                                    SEQ_ALLOC_SUPER, "super");
+#ifdef __KERNEL__
+        }
+#endif
+        return rc;
 }
 
 int seq_client_alloc_super(struct lu_client_seq *seq)
@@ -142,8 +155,21 @@ EXPORT_SYMBOL(seq_client_alloc_super);
 /* request sequence-controller node to allocate new meta-sequence. */
 static int __seq_client_alloc_meta(struct lu_client_seq *seq)
 {
-        return seq_client_rpc(seq, &seq->lcs_range,
-                              SEQ_ALLOC_META, "meta");
+        int rc;
+
+#ifdef __KERNEL__
+        if (seq->lcs_srv) {
+                rc = seq_server_alloc_meta(seq->lcs_srv,
+                                           &seq->lcs_range,
+                                           seq->lcs_ctx);
+        } else {
+#endif
+                rc = seq_client_rpc(seq, &seq->lcs_range,
+                                    SEQ_ALLOC_META, "meta");
+#ifdef __KERNEL__
+        }
+#endif
+        return rc;
 }
 
 int seq_client_alloc_meta(struct lu_client_seq *seq)
@@ -305,24 +331,37 @@ static void seq_client_proc_fini(struct lu_client_seq *seq)
 #endif
 
 int seq_client_init(struct lu_client_seq *seq,
-                    const char *uuid,
                     struct obd_export *exp,
-                    enum lu_cli_type type)
+                    enum lu_cli_type type,
+                    const char *prefix,
+                    struct lu_server_seq *srv,
+                    const struct lu_context *ctx)
 {
-        int rc = 0;
+        int rc;
         ENTRY;
 
-        LASSERT(exp != NULL);
+        LASSERT(seq != NULL);
+        LASSERT(prefix != NULL);
 
+        seq->lcs_ctx = ctx;
+        seq->lcs_exp = exp;
+        seq->lcs_srv = srv;
         seq->lcs_type = type;
         fid_zero(&seq->lcs_fid);
         range_zero(&seq->lcs_range);
         sema_init(&seq->lcs_sem, 1);
-        seq->lcs_exp = class_export_get(exp);
         seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
 
+        if (exp == NULL) {
+                LASSERT(seq->lcs_ctx != NULL);
+                LASSERT(seq->lcs_srv != NULL);
+        } else {
+                LASSERT(seq->lcs_exp != NULL);
+                seq->lcs_exp = class_export_get(seq->lcs_exp);
+        }
+
         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
-                 "%s-cli-%s", LUSTRE_SEQ_NAME, uuid);
+                 "%s-cli-%s", LUSTRE_SEQ_NAME, prefix);
 
         rc = seq_client_proc_init(seq);
         if (rc)
@@ -345,7 +384,10 @@ void seq_client_fini(struct lu_client_seq *seq)
                 seq->lcs_exp = NULL;
         }
 
-        CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager\n");
+        seq->lcs_srv = NULL;
+
+        CDEBUG(D_INFO|D_WARNING,
+               "Client Sequence Manager\n");
 
         EXIT;
 }
index 8a27b5f..7aa3006 100644 (file)
@@ -68,34 +68,14 @@ int seq_store_write(struct lu_server_seq *seq,
         info = lu_context_key_get(ctx, &seq_thread_key);
         LASSERT(info != NULL);
 
-        rc = dt_obj->do_body_ops->dbo_read(ctx, dt_obj,
-                                           (char *)&info->sti_record,
-                                           sizeof(info->sti_record), &pos);
-        if (rc < 0) {
-                CERROR("can't read seq data from store, rc %d\n", rc);
-                RETURN(rc);
-        }
-
-        if (rc > 0 && rc != sizeof(info->sti_record)) {
-                CWARN("read only %d bytes of %d, old data - zeroing out\n",
-                       rc, sizeof(info->sti_record));
-                memset(&info->sti_record, 0, sizeof(info->sti_record));
-        }
-
         /* stub here, will fix it later */
         info->sti_txn.tp_credits = SEQ_TXN_STORE_CREDITS;
 
         th = dt_dev->dd_ops->dt_trans_start(ctx, dt_dev, &info->sti_txn);
         if (!IS_ERR(th)) {
                 /* store ranges in le format */
-                if (seq->lss_type == LUSTRE_SEQ_SERVER) {
-                        range_to_le(&info->sti_record.srv_space, &seq->lss_space);
-                        range_to_le(&info->sti_record.srv_super, &seq->lss_super);
-                } else {
-                        range_to_le(&info->sti_record.ctl_space, &seq->lss_space);
-                        range_to_le(&info->sti_record.ctl_super, &seq->lss_super);
-
-                }
+                range_to_le(&info->sti_record.ssr_space, &seq->lss_space);
+                range_to_le(&info->sti_record.ssr_super, &seq->lss_super);
  
                 rc = dt_obj->do_body_ops->dbo_write(ctx, dt_obj,
                                                     (char *)&info->sti_record,
@@ -139,13 +119,9 @@ int seq_store_read(struct lu_server_seq *seq,
                                            sizeof(info->sti_record), &pos);
         
         if (rc == sizeof(info->sti_record)) {
-                if (seq->lss_type == LUSTRE_SEQ_SERVER) {
-                        seq->lss_space = info->sti_record.srv_space;
-                        seq->lss_super = info->sti_record.srv_super;
-                } else {
-                        seq->lss_space = info->sti_record.ctl_space;
-                        seq->lss_super = info->sti_record.ctl_super;
-                }
+                seq->lss_space = info->sti_record.ssr_space;
+                seq->lss_super = info->sti_record.ssr_super;
+
                 lustre_swab_lu_range(&seq->lss_space);
                 lustre_swab_lu_range(&seq->lss_super);
 
@@ -171,13 +147,17 @@ int seq_store_init(struct lu_server_seq *seq,
 {
         struct dt_object *dt_obj;
         struct lu_fid fid;
+        const char *name;
         int rc;
         ENTRY;
 
         LASSERT(seq->lss_md_service == NULL);
         LASSERT(seq->lss_dt_service == NULL);
 
-        dt_obj = dt_store_open(ctx, dt, "seq", &fid);
+        name = seq->lss_type == LUSTRE_SEQ_SERVER ?
+                "seq_srv" : "seq_ctl";
+        
+        dt_obj = dt_store_open(ctx, dt, name, &fid);
         if (!IS_ERR(dt_obj)) {
                 seq->lss_obj = dt_obj;
                rc = 0;
index 6c5f99f..27366fd 100644 (file)
@@ -93,28 +93,31 @@ static void __exit fld_mod_exit(void)
 }
 
 /* insert index entry and update cache */
-static int fld_server_create(struct lu_server_fld *fld,
-                             const struct lu_context *ctx,
-                             seqno_t seq, mdsno_t mds)
+int fld_server_create(struct lu_server_fld *fld,
+                      const struct lu_context *ctx,
+                      seqno_t seq, mdsno_t mds)
 {
         return fld_index_create(fld, ctx, seq, mds);
 }
+EXPORT_SYMBOL(fld_server_create);
 
 /* delete index entry */
-static int fld_server_delete(struct lu_server_fld *fld,
-                             const struct lu_context *ctx,
-                             seqno_t seq)
+int fld_server_delete(struct lu_server_fld *fld,
+                      const struct lu_context *ctx,
+                      seqno_t seq)
 {
         return fld_index_delete(fld, ctx, seq);
 }
+EXPORT_SYMBOL(fld_server_delete);
 
 /* issue on-disk index lookup */
-static int fld_server_lookup(struct lu_server_fld *fld,
-                             const struct lu_context *ctx,
-                             seqno_t seq, mdsno_t *mds)
+int fld_server_lookup(struct lu_server_fld *fld,
+                      const struct lu_context *ctx,
+                      seqno_t seq, mdsno_t *mds)
 {
         return fld_index_lookup(fld, ctx, seq, mds);
 }
+EXPORT_SYMBOL(fld_server_lookup);
 
 static int fld_server_handle(struct lu_server_fld *fld,
                              const struct lu_context *ctx,
@@ -265,7 +268,7 @@ int fid_is_local(struct lu_site *site, const struct lu_fid *fid)
                 mdsno_t mds;
                 int rc;
 
-                rc = fld_cache_lookup(site->ls_client_fld->fld_cache,
+                rc = fld_cache_lookup(site->ls_client_fld->lcf_cache,
                                       fid_seq(fid), &mds);
                 if (rc == 0)
                         result = (mds == site->ls_node_id);
@@ -282,19 +285,19 @@ static int fld_server_proc_init(struct lu_server_fld *fld)
         int rc = 0;
         ENTRY;
 
-        fld->fld_proc_dir = lprocfs_register(fld->fld_name,
+        fld->lsf_proc_dir = lprocfs_register(fld->lsf_name,
                                              proc_lustre_root,
                                              fld_server_proc_list, fld);
-        if (IS_ERR(fld->fld_proc_dir)) {
-                rc = PTR_ERR(fld->fld_proc_dir);
+        if (IS_ERR(fld->lsf_proc_dir)) {
+                rc = PTR_ERR(fld->lsf_proc_dir);
                 RETURN(rc);
         }
 
-        fld->fld_proc_entry = lprocfs_register("services",
-                                               fld->fld_proc_dir,
+        fld->lsf_proc_entry = lprocfs_register("services",
+                                               fld->lsf_proc_dir,
                                                NULL, NULL);
-        if (IS_ERR(fld->fld_proc_entry)) {
-                rc = PTR_ERR(fld->fld_proc_entry);
+        if (IS_ERR(fld->lsf_proc_entry)) {
+                rc = PTR_ERR(fld->lsf_proc_entry);
                 GOTO(out_cleanup, rc);
         }
         RETURN(rc);
@@ -307,16 +310,16 @@ out_cleanup:
 static void fld_server_proc_fini(struct lu_server_fld *fld)
 {
         ENTRY;
-        if (fld->fld_proc_entry != NULL) {
-                if (!IS_ERR(fld->fld_proc_entry))
-                        lprocfs_remove(fld->fld_proc_entry);
-                fld->fld_proc_entry = NULL;
+        if (fld->lsf_proc_entry != NULL) {
+                if (!IS_ERR(fld->lsf_proc_entry))
+                        lprocfs_remove(fld->lsf_proc_entry);
+                fld->lsf_proc_entry = NULL;
         }
 
-        if (fld->fld_proc_dir != NULL) {
-                if (!IS_ERR(fld->fld_proc_dir))
-                        lprocfs_remove(fld->fld_proc_dir);
-                fld->fld_proc_dir = NULL;
+        if (fld->lsf_proc_dir != NULL) {
+                if (!IS_ERR(fld->lsf_proc_dir))
+                        lprocfs_remove(fld->lsf_proc_dir);
+                fld->lsf_proc_dir = NULL;
         }
         EXIT;
 }
@@ -338,7 +341,8 @@ int fld_server_init(struct lu_server_fld *fld,
                     const char *uuid)
 {
         int rc;
-        struct ptlrpc_service_conf fld_conf = {
+        static struct ptlrpc_service_conf fld_conf;
+        fld_conf = (typeof(fld_conf)) {
                 .psc_nbufs            = MDS_NBUFS,
                 .psc_bufsize          = MDS_BUFSIZE,
                 .psc_max_req_size     = FLD_MAXREQSIZE,
@@ -351,7 +355,7 @@ int fld_server_init(struct lu_server_fld *fld,
         };
         ENTRY;
 
-        snprintf(fld->fld_name, sizeof(fld->fld_name),
+        snprintf(fld->lsf_name, sizeof(fld->lsf_name),
                  "%s-srv-%s", LUSTRE_FLD_NAME, uuid);
 
         rc = fld_index_init(fld, ctx, dt);
@@ -362,12 +366,12 @@ int fld_server_init(struct lu_server_fld *fld,
         if (rc)
                 GOTO(out, rc);
 
-        fld->fld_service =
+        fld->lsf_service =
                 ptlrpc_init_svc_conf(&fld_conf, fld_handle,
                                      LUSTRE_FLD_NAME,
-                                     fld->fld_proc_entry, NULL);
-        if (fld->fld_service != NULL)
-                rc = ptlrpc_start_threads(NULL, fld->fld_service,
+                                     fld->lsf_proc_entry, NULL);
+        if (fld->lsf_service != NULL)
+                rc = ptlrpc_start_threads(NULL, fld->lsf_service,
                                           LUSTRE_FLD_NAME);
         else
                 rc = -ENOMEM;
@@ -387,13 +391,12 @@ void fld_server_fini(struct lu_server_fld *fld,
 {
         ENTRY;
 
-        if (fld->fld_service != NULL) {
-                ptlrpc_unregister_service(fld->fld_service);
-                fld->fld_service = NULL;
+        if (fld->lsf_service != NULL) {
+                ptlrpc_unregister_service(fld->lsf_service);
+                fld->lsf_service = NULL;
         }
 
         fld_server_proc_fini(fld);
-
         fld_index_fini(fld, ctx);
         
         EXIT;
index e1a847f..159b03f 100644 (file)
@@ -101,14 +101,14 @@ int fld_index_create(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
                      seqno_t seq, mdsno_t mds)
 {
-        struct dt_object *dt_obj = fld->fld_obj;
+        struct dt_object *dt_obj = fld->lsf_obj;
         struct dt_device *dt_dev;
         struct txn_param txn;
         struct thandle *th;
         int rc;
         ENTRY;
 
-        dt_dev = lu2dt_dev(fld->fld_obj->do_lu.lo_dev);
+        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
         
         /* stub here, will fix it later */
         txn.tp_credits = FLD_TXN_INDEX_INSERT_CREDITS;
@@ -128,14 +128,14 @@ int fld_index_delete(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
                      seqno_t seq)
 {
-        struct dt_object *dt_obj = fld->fld_obj;
+        struct dt_object *dt_obj = fld->lsf_obj;
         struct dt_device *dt_dev;
         struct txn_param txn;
         struct thandle *th;
         int rc;
         ENTRY;
 
-        dt_dev = lu2dt_dev(fld->fld_obj->do_lu.lo_dev);
+        dt_dev = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
         txn.tp_credits = FLD_TXN_INDEX_DELETE_CREDITS;
         th = dt_dev->dd_ops->dt_trans_start(ctx, dt_dev, &txn);
         if (!IS_ERR(th)) {
@@ -151,7 +151,7 @@ int fld_index_lookup(struct lu_server_fld *fld,
                      const struct lu_context *ctx,
                      seqno_t seq, mdsno_t *mds)
 {
-        struct dt_object *dt_obj = fld->fld_obj;
+        struct dt_object *dt_obj = fld->lsf_obj;
         struct dt_rec    *rec = fld_rec(ctx, 0);
         int rc;
         ENTRY;
@@ -176,17 +176,18 @@ int fld_index_init(struct lu_server_fld *fld,
          * lu_context_key has to be registered before threads are started,
          * check this.
          */
-        LASSERT(fld->fld_service == NULL);
+        LASSERT(fld->lsf_service == NULL);
 
         dt_obj = dt_store_open(ctx, dt, fld_index_name, &fid);
         if (!IS_ERR(dt_obj)) {
-                fld->fld_obj = dt_obj;
+                fld->lsf_obj = dt_obj;
                 rc = dt_obj->do_ops->do_index_try(ctx, dt_obj,
                                                   &fld_index_features);
                 if (rc == 0)
                         LASSERT(dt_obj->do_index_ops != NULL);
                 else
-                        CERROR("\"%s\" is not an index!\n", fld_index_name);
+                        CERROR("\"%s\" is not an index!\n",
+                               fld_index_name);
         } else {
                 CERROR("cannot find \"%s\" obj %d\n",
                        fld_index_name, (int)PTR_ERR(dt_obj));
@@ -200,10 +201,10 @@ void fld_index_fini(struct lu_server_fld *fld,
                     const struct lu_context *ctx)
 {
         ENTRY;
-        if (fld->fld_obj != NULL) {
-                if (!IS_ERR(fld->fld_obj))
-                        lu_object_put(ctx, &fld->fld_obj->do_lu);
-                fld->fld_obj = NULL;
+        if (fld->lsf_obj != NULL) {
+                if (!IS_ERR(fld->lsf_obj))
+                        lu_object_put(ctx, &fld->lsf_obj->do_lu);
+                fld->lsf_obj = NULL;
         }
         EXIT;
 }
index bbb2a7b..8734c5a 100644 (file)
@@ -85,4 +85,13 @@ extern struct lprocfs_vars fld_client_proc_list[];
 
 #endif
 
+static inline const char *
+fld_target_name(struct lu_fld_target *tar)
+{
+        if (tar->ft_srv != NULL)
+                return tar->ft_srv->lsf_name;
+
+        return tar->ft_exp->exp_client_uuid.uuid;
+}
+
 #endif
index d82758d..c626b10 100644 (file)
 static int fld_rrb_hash(struct lu_client_fld *fld,
                         seqno_t seq)
 {
-        LASSERT(fld->fld_count > 0);
-        return do_div(seq, fld->fld_count);
+        LASSERT(fld->lcf_count > 0);
+        return do_div(seq, fld->lcf_count);
 }
 
-static struct fld_target *
+static struct lu_fld_target *
 fld_rrb_scan(struct lu_client_fld *fld, seqno_t seq)
 {
-        struct fld_target *target;
+        struct lu_fld_target *target;
         int hash;
         ENTRY;
 
         hash = fld_rrb_hash(fld, seq);
 
-        list_for_each_entry(target, &fld->fld_targets, fldt_chain) {
-                if (target->fldt_idx == hash)
+        list_for_each_entry(target, &fld->lcf_targets, ft_chain) {
+                if (target->ft_idx == hash)
                         RETURN(target);
         }
 
@@ -86,7 +86,7 @@ static int fld_dht_hash(struct lu_client_fld *fld,
         return fld_rrb_hash(fld, seq);
 }
 
-static struct fld_target *
+static struct lu_fld_target *
 fld_dht_scan(struct lu_client_fld *fld, seqno_t seq)
 {
         /* XXX: here should be DHT scan code */
@@ -109,18 +109,18 @@ struct lu_fld_hash fld_hash[3] = {
         }
 };
 
-static struct fld_target *
+static struct lu_fld_target *
 fld_client_get_target(struct lu_client_fld *fld,
                       seqno_t seq)
 {
-        struct fld_target *target;
+        struct lu_fld_target *target;
         ENTRY;
 
-        LASSERT(fld->fld_hash != NULL);
+        LASSERT(fld->lcf_hash != NULL);
 
-        spin_lock(&fld->fld_lock);
-        target = fld->fld_hash->fh_scan_func(fld, seq);
-        spin_unlock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
+        target = fld->lcf_hash->fh_scan_func(fld, seq);
+        spin_unlock(&fld->lcf_lock);
 
         RETURN(target);
 }
@@ -130,39 +130,45 @@ fld_client_get_target(struct lu_client_fld *fld,
  * of FLD module.
  */
 int fld_client_add_target(struct lu_client_fld *fld,
-                          struct obd_export *exp)
+                          struct lu_fld_target *tar)
 {
-        struct client_obd *cli = &exp->exp_obd->u.cli;
-        struct fld_target *target, *tmp;
+        const char *tar_name = fld_target_name(tar);
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
-        LASSERT(exp != NULL);
+        LASSERT(tar != NULL);
+        LASSERT(tar_name != NULL);
+        LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
 
-        CDEBUG(D_INFO|D_WARNING, "%s: adding export %s\n",
-              fld->fld_name, cli->cl_target_uuid.uuid);
+        CDEBUG(D_INFO|D_WARNING, "%s: adding target %s\n",
+              fld->lcf_name, tar_name);
 
         OBD_ALLOC_PTR(target);
         if (target == NULL)
                 RETURN(-ENOMEM);
 
-        spin_lock(&fld->fld_lock);
-        list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
-                if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
-                                    &exp->exp_client_uuid))
+        spin_lock(&fld->lcf_lock);
+        list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
+                const char *tmp_name = fld_target_name(tmp);
+                
+                if (strlen(tar_name) == strlen(tmp_name) &&
+                    strcmp(tmp_name, tar_name) == 0)
                 {
-                        spin_unlock(&fld->fld_lock);
+                        spin_unlock(&fld->lcf_lock);
                         OBD_FREE_PTR(target);
                         RETURN(-EEXIST);
                 }
         }
 
-        target->fldt_exp = class_export_get(exp);
-        target->fldt_idx = fld->fld_count;
+        target->ft_exp = tar->ft_exp;
+        target->ft_srv = tar->ft_srv;
+        target->ft_idx = tar->ft_idx;
 
-        list_add_tail(&target->fldt_chain,
-                      &fld->fld_targets);
-        fld->fld_count++;
-        spin_unlock(&fld->fld_lock);
+        list_add_tail(&target->ft_chain,
+                      &fld->lcf_targets);
+        
+        fld->lcf_count++;
+        spin_unlock(&fld->lcf_lock);
 
         RETURN(0);
 }
@@ -170,26 +176,24 @@ EXPORT_SYMBOL(fld_client_add_target);
 
 /* remove export from FLD */
 int fld_client_del_target(struct lu_client_fld *fld,
-                          struct obd_export *exp)
+                          __u64 idx)
 {
-        struct fld_target *target, *tmp;
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
-        spin_lock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
         list_for_each_entry_safe(target, tmp,
-                                 &fld->fld_targets, fldt_chain) {
-                if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
-                                    &exp->exp_client_uuid))
-                {
-                        fld->fld_count--;
-                        list_del(&target->fldt_chain);
-                        spin_unlock(&fld->fld_lock);
-                        class_export_put(target->fldt_exp);
+                                 &fld->lcf_targets, ft_chain) {
+                if (target->ft_idx == idx) {
+                        fld->lcf_count--;
+                        list_del(&target->ft_chain);
+                        spin_unlock(&fld->lcf_lock);
+                        class_export_put(target->ft_exp);
                         OBD_FREE_PTR(target);
                         RETURN(0);
                 }
         }
-        spin_unlock(&fld->fld_lock);
+        spin_unlock(&fld->lcf_lock);
         RETURN(-ENOENT);
 }
 EXPORT_SYMBOL(fld_client_del_target);
@@ -202,17 +206,17 @@ static int fld_client_proc_init(struct lu_client_fld *fld)
         int rc;
         ENTRY;
 
-        fld->fld_proc_dir = lprocfs_register(fld->fld_name,
+        fld->lcf_proc_dir = lprocfs_register(fld->lcf_name,
                                              proc_lustre_root,
                                              NULL, NULL);
 
-        if (IS_ERR(fld->fld_proc_dir)) {
+        if (IS_ERR(fld->lcf_proc_dir)) {
                 CERROR("LProcFS failed in fld-init\n");
-                rc = PTR_ERR(fld->fld_proc_dir);
+                rc = PTR_ERR(fld->lcf_proc_dir);
                 RETURN(rc);
         }
 
-        rc = lprocfs_add_vars(fld->fld_proc_dir,
+        rc = lprocfs_add_vars(fld->lcf_proc_dir,
                               fld_client_proc_list, fld);
         if (rc) {
                 CERROR("can't init FLD "
@@ -230,10 +234,10 @@ out_cleanup:
 static void fld_client_proc_fini(struct lu_client_fld *fld)
 {
         ENTRY;
-        if (fld->fld_proc_dir) {
-                if (!IS_ERR(fld->fld_proc_dir))
-                        lprocfs_remove(fld->fld_proc_dir);
-                fld->fld_proc_dir = NULL;
+        if (fld->lcf_proc_dir) {
+                if (!IS_ERR(fld->lcf_proc_dir))
+                        lprocfs_remove(fld->lcf_proc_dir);
+                fld->lcf_proc_dir = NULL;
         }
         EXIT;
 }
@@ -261,7 +265,8 @@ static inline int hash_is_sane(int hash)
 #define FLD_CACHE_THRESHOLD 10
 
 int fld_client_init(struct lu_client_fld *fld,
-                    const char *uuid, int hash)
+                    const char *prefix, int hash,
+                    const struct lu_context *ctx)
 {
 #ifdef __KERNEL__
         int cache_size, cache_threshold;
@@ -272,17 +277,18 @@ int fld_client_init(struct lu_client_fld *fld,
         LASSERT(fld != NULL);
 
         if (!hash_is_sane(hash)) {
-                CERROR("wrong hash function %#x\n", hash);
+                CERROR("Wrong hash function %#x\n", hash);
                 RETURN(-EINVAL);
         }
 
-        INIT_LIST_HEAD(&fld->fld_targets);
-        spin_lock_init(&fld->fld_lock);
-        fld->fld_hash = &fld_hash[hash];
-        fld->fld_count = 0;
+        fld->lcf_count = 0;
+        fld->lcf_ctx = ctx;
+        spin_lock_init(&fld->lcf_lock);
+        fld->lcf_hash = &fld_hash[hash];
+        INIT_LIST_HEAD(&fld->lcf_targets);
 
-        snprintf(fld->fld_name, sizeof(fld->fld_name),
-                 "%s-cli-%s", LUSTRE_FLD_NAME, uuid);
+        snprintf(fld->lcf_name, sizeof(fld->lcf_name),
+                 "%s-cli-%s", LUSTRE_FLD_NAME, prefix);
 
 #ifdef __KERNEL__
         cache_size = FLD_CACHE_SIZE /
@@ -291,12 +297,12 @@ int fld_client_init(struct lu_client_fld *fld,
         cache_threshold = cache_size *
                 FLD_CACHE_THRESHOLD / 100;
 
-        fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE,
+        fld->lcf_cache = fld_cache_init(FLD_HTABLE_SIZE,
                                         cache_size,
                                         cache_threshold);
-        if (IS_ERR(fld->fld_cache)) {
-                rc = PTR_ERR(fld->fld_cache);
-                fld->fld_cache = NULL;
+        if (IS_ERR(fld->lcf_cache)) {
+                rc = PTR_ERR(fld->lcf_cache);
+                fld->lcf_cache = NULL;
                 GOTO(out, rc);
         }
 #endif
@@ -311,33 +317,33 @@ out:
         else
                 CDEBUG(D_INFO|D_WARNING,
                        "Client FLD, using \"%s\" hash\n",
-                       fld->fld_hash->fh_name);
+                       fld->lcf_hash->fh_name);
         return rc;
 }
 EXPORT_SYMBOL(fld_client_init);
 
 void fld_client_fini(struct lu_client_fld *fld)
 {
-        struct fld_target *target, *tmp;
+        struct lu_fld_target *target, *tmp;
         ENTRY;
 
         fld_client_proc_fini(fld);
 
-        spin_lock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
         list_for_each_entry_safe(target, tmp,
-                                 &fld->fld_targets, fldt_chain) {
-                fld->fld_count--;
-                list_del(&target->fldt_chain);
-                class_export_put(target->fldt_exp);
+                                 &fld->lcf_targets, ft_chain) {
+                fld->lcf_count--;
+                list_del(&target->ft_chain);
+                class_export_put(target->ft_exp);
                 OBD_FREE_PTR(target);
         }
-        spin_unlock(&fld->fld_lock);
+        spin_unlock(&fld->lcf_lock);
 
 #ifdef __KERNEL__
-        if (fld->fld_cache != NULL) {
-                if (!IS_ERR(fld->fld_cache))
-                        fld_cache_fini(fld->fld_cache);
-                fld->fld_cache = NULL;
+        if (fld->lcf_cache != NULL) {
+                if (!IS_ERR(fld->lcf_cache))
+                        fld_cache_fini(fld->lcf_cache);
+                fld->lcf_cache = NULL;
         }
 #endif
 
@@ -400,14 +406,26 @@ int fld_client_create(struct lu_client_fld *fld,
                       seqno_t seq, mdsno_t mds)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
-        struct fld_target *target;
+        struct lu_fld_target *target;
         int rc;
         ENTRY;
 
         target = fld_client_get_target(fld, seq);
         LASSERT(target != NULL);
 
-        rc = fld_client_rpc(target->fldt_exp, &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+        if (target->ft_srv != NULL) {
+                LASSERT(fld->lcf_ctx != NULL);
+                rc = fld_server_create(target->ft_srv,
+                                       fld->lcf_ctx,
+                                       seq, mds);
+        } else {
+#endif
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_CREATE);
+#ifdef __KERNEL__
+        }
+#endif
 
         if (rc == 0) {
                 /*
@@ -416,7 +434,7 @@ int fld_client_create(struct lu_client_fld *fld,
                  * reason is that, we do not want to stop proceeding because of
                  * cache errors. --umka
                  */
-                fld_cache_insert(fld->fld_cache, seq, mds);
+                fld_cache_insert(fld->lcf_cache, seq, mds);
         }
         RETURN(rc);
 }
@@ -426,17 +444,28 @@ int fld_client_delete(struct lu_client_fld *fld,
                       seqno_t seq)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
-        struct fld_target *target;
+        struct lu_fld_target *target;
         int rc;
         ENTRY;
 
-        fld_cache_delete(fld->fld_cache, seq);
+        fld_cache_delete(fld->lcf_cache, seq);
 
         target = fld_client_get_target(fld, seq);
         LASSERT(target != NULL);
 
-        rc = fld_client_rpc(target->fldt_exp,
-                            &md_fld, FLD_DELETE);
+#ifdef __KERNEL__
+        if (target->ft_srv != NULL) {
+                LASSERT(fld->lcf_ctx != NULL);
+                rc = fld_server_delete(target->ft_srv,
+                                       fld->lcf_ctx,
+                                       seq);
+        } else {
+#endif
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_DELETE);
+#ifdef __KERNEL__
+        }
+#endif
 
         RETURN(rc);
 }
@@ -446,12 +475,12 @@ int fld_client_lookup(struct lu_client_fld *fld,
                       seqno_t seq, mdsno_t *mds)
 {
         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
-        struct fld_target *target;
+        struct lu_fld_target *target;
         int rc;
         ENTRY;
 
         /* lookup it in the cache */
-        rc = fld_cache_lookup(fld->fld_cache, seq, mds);
+        rc = fld_cache_lookup(fld->lcf_cache, seq, mds);
         if (rc == 0)
                 RETURN(0);
 
@@ -459,8 +488,19 @@ int fld_client_lookup(struct lu_client_fld *fld,
         target = fld_client_get_target(fld, seq);
         LASSERT(target != NULL);
 
-        rc = fld_client_rpc(target->fldt_exp,
-                            &md_fld, FLD_LOOKUP);
+#ifdef __KERNEL__
+        if (target->ft_srv != NULL) {
+                LASSERT(fld->lcf_ctx != NULL);
+                rc = fld_server_lookup(target->ft_srv,
+                                       fld->lcf_ctx,
+                                       seq, mds);
+        } else {
+#endif
+                rc = fld_client_rpc(target->ft_exp,
+                                    &md_fld, FLD_LOOKUP);
+#ifdef __KERNEL__
+        }
+#endif
         if (rc == 0) {
                 *mds = md_fld.mf_mds;
 
@@ -468,7 +508,7 @@ int fld_client_lookup(struct lu_client_fld *fld,
                  * Do not return error here as well. See previous comment in
                  * same situation in function fld_client_create(). --umka
                  */
-                fld_cache_insert(fld->fld_cache, seq, *mds);
+                fld_cache_insert(fld->lcf_cache, seq, *mds);
         }
         RETURN(rc);
 }
index 817db2f..6327506 100644 (file)
@@ -53,27 +53,25 @@ fld_proc_read_targets(char *page, char **start, off_t off,
                       int count, int *eof, void *data)
 {
         struct lu_client_fld *fld = (struct lu_client_fld *)data;
-        struct fld_target *target;
+        struct lu_fld_target *target;
        int total = 0, rc;
        ENTRY;
 
         LASSERT(fld != NULL);
 
-        spin_lock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
         list_for_each_entry(target,
-                            &fld->fld_targets, fldt_chain)
+                            &fld->lcf_targets, ft_chain)
         {
-                struct client_obd *cli = &target->fldt_exp->exp_obd->u.cli;
-                
                 rc = snprintf(page, count, "%s\n",
-                              cli->cl_target_uuid.uuid);
+                              fld_target_name(target));
                 page += rc;
                 count -= rc;
                 total += rc;
                 if (count == 0)
                         break;
         }
-        spin_unlock(&fld->fld_lock);
+        spin_unlock(&fld->lcf_lock);
        RETURN(total);
 }
 
@@ -87,10 +85,10 @@ fld_proc_read_hash(char *page, char **start, off_t off,
 
         LASSERT(fld != NULL);
 
-        spin_lock(&fld->fld_lock);
+        spin_lock(&fld->lcf_lock);
         rc = snprintf(page, count, "%s\n",
-                      fld->fld_hash->fh_name);
-        spin_unlock(&fld->fld_lock);
+                      fld->lcf_hash->fh_name);
+        spin_unlock(&fld->lcf_lock);
 
        RETURN(rc);
 }
@@ -117,12 +115,12 @@ fld_proc_write_hash(struct file *file, const char *buffer,
         }
 
         if (hash != NULL) {
-                spin_lock(&fld->fld_lock);
-                fld->fld_hash = hash;
-                spin_unlock(&fld->fld_lock);
+                spin_lock(&fld->lcf_lock);
+                fld->lcf_hash = hash;
+                spin_unlock(&fld->lcf_lock);
 
-                CDEBUG(D_WARNING, "FLD(cli): changed hash to \"%s\"\n",
-                       hash->fh_name);
+                CDEBUG(D_WARNING, "%s: changed hash to \"%s\"\n",
+                       fld->lcf_name, hash->fh_name);
         }
        
         RETURN(count);
index 7e5f57e..2529ac1 100644 (file)
@@ -500,7 +500,6 @@ struct lu_site {
          */
         int                   ls_hash_mask;
 
-
         /*
          * LRU list, updated on each access to object. Protected by
          * ->ls_guard.
@@ -548,8 +547,6 @@ struct lu_site {
          * Client Seq Manager
          */
         struct lu_client_seq *ls_client_seq;
-
-        /* Sequence controller node */
         struct obd_export    *ls_client_exp;
 
         /* statistical counters. Protected by nothing, races are accepted. */
index f8ae53a..c026364 100644 (file)
@@ -61,6 +61,8 @@ enum lu_cli_type {
         LUSTRE_SEQ_DATA
 };
 
+struct lu_server_seq;
+
 /* client sequence manager interface */
 struct lu_client_seq {
         /* sequence-controller export. */
@@ -88,9 +90,12 @@ struct lu_client_seq {
         /* sequence width, that is how many objects may be allocated in one
          * sequence. Default value for it is LUSTRE_SEQ_MAX_WIDTH. */
         __u64                   lcs_width;
+
+        /* seq-server for direct talking */
+        struct lu_server_seq   *lcs_srv;
+        const struct lu_context      *lcs_ctx;
 };
 
-#ifdef __KERNEL__
 /* server sequence manager interface */
 struct lu_server_seq {
         /* available sequence space */
@@ -135,28 +140,37 @@ struct lu_server_seq {
         __u64                   lss_super_width;
         __u64                   lss_meta_width;
 };
-#endif
 
 #ifdef __KERNEL__
 
 int seq_server_init(struct lu_server_seq *seq,
                     struct dt_device *dev,
-                    const char *uuid,
+                    const char *prefix,
                     enum lu_mgr_type type,
                     const struct lu_context *ctx);
 
 void seq_server_fini(struct lu_server_seq *seq,
                      const struct lu_context *ctx);
 
+int seq_server_alloc_super(struct lu_server_seq *seq,
+                           struct lu_range *range,
+                           const struct lu_context *ctx);
+
+int seq_server_alloc_meta(struct lu_server_seq *seq,
+                          struct lu_range *range,
+                          const struct lu_context *ctx);
+
 int seq_server_set_cli(struct lu_server_seq *seq,
                        struct lu_client_seq *cli,
                        const struct lu_context *ctx);
 #endif
 
 int seq_client_init(struct lu_client_seq *seq,
-                    const char *uuid,
                     struct obd_export *exp,
-                    enum lu_cli_type type);
+                    enum lu_cli_type type,
+                    const char *prefix,
+                    struct lu_server_seq *srv,
+                    const struct lu_context *ctx);
 
 void seq_client_fini(struct lu_client_seq *seq);
 
index 6549d3d..4faf665 100644 (file)
@@ -40,14 +40,20 @@ enum {
         LUSTRE_CLI_FLD_HASH_RRB
 };
 
-struct fld_target {
-        struct list_head   fldt_chain;
-        struct obd_export *fldt_exp;
-        __u64              fldt_idx;
+struct lu_server_fld;
+
+struct lu_fld_target {
+        struct list_head         ft_chain;
+        struct obd_export       *ft_exp;
+        struct lu_server_fld    *ft_srv;
+        __u64                    ft_idx;
 };
 
-typedef int (*fld_hash_func_t) (struct lu_client_fld *, __u64);
-typedef struct fld_target * (*fld_scan_func_t) (struct lu_client_fld *, __u64);
+typedef int
+(*fld_hash_func_t) (struct lu_client_fld *, __u64);
+
+typedef struct lu_fld_target *
+(*fld_scan_func_t) (struct lu_client_fld *, __u64);
 
 struct lu_fld_hash {
         const char              *fh_name;
@@ -57,19 +63,19 @@ struct lu_fld_hash {
 
 struct lu_server_fld {
         /* service proc entry */
-        cfs_proc_dir_entry_t    *fld_proc_entry;
+        cfs_proc_dir_entry_t    *lsf_proc_entry;
 
         /* fld dir proc entry */
-        cfs_proc_dir_entry_t    *fld_proc_dir;
+        cfs_proc_dir_entry_t    *lsf_proc_dir;
 
         /* pointer to started server service */
-        struct ptlrpc_service   *fld_service;
+        struct ptlrpc_service   *lsf_service;
 
         /* /fld file object device */
-        struct dt_object        *fld_obj;
+        struct dt_object        *lsf_obj;
 
         /* fld service name in form "fld-MDTXXX" */
-        char                     fld_name[80];
+        char                     lsf_name[80];
 };
 
 struct fld_cache_entry {
@@ -110,25 +116,27 @@ struct fld_cache_info {
 
 struct lu_client_fld {
         /* client side proc entry */
-        cfs_proc_dir_entry_t    *fld_proc_dir;
+        cfs_proc_dir_entry_t    *lcf_proc_dir;
 
         /* list of exports client FLD knows about */
-        struct list_head         fld_targets;
+        struct list_head         lcf_targets;
 
         /* current hash to be used to chose an export */
-        struct lu_fld_hash      *fld_hash;
+        struct lu_fld_hash      *lcf_hash;
 
         /* exports count */
-        int                      fld_count;
+        int                      lcf_count;
 
         /* lock protecting exports list and fld_hash */
-        spinlock_t               fld_lock;
+        spinlock_t               lcf_lock;
 
         /* client FLD cache */
-        struct fld_cache_info   *fld_cache;
+        struct fld_cache_info   *lcf_cache;
 
         /* client fld proc entry name */
-        char                     fld_name[80];
+        char                     lcf_name[80];
+
+        const struct lu_context       *lcf_ctx;
 };
 
 /* server methods */
@@ -140,10 +148,22 @@ int fld_server_init(struct lu_server_fld *fld,
 void fld_server_fini(struct lu_server_fld *fld,
                      const struct lu_context *ctx);
 
+int fld_server_create(struct lu_server_fld *fld,
+                      const struct lu_context *ctx,
+                      seqno_t seq, mdsno_t mds);
+
+int fld_server_delete(struct lu_server_fld *fld,
+                      const struct lu_context *ctx,
+                      seqno_t seq);
+
+int fld_server_lookup(struct lu_server_fld *fld,
+                      const struct lu_context *ctx,
+                      seqno_t seq, mdsno_t *mds);
+
 /* client methods */
 int fld_client_init(struct lu_client_fld *fld,
-                    const char *uuid,
-                    int hash);
+                    const char *prefix, int hash,
+                    const struct lu_context *ctx);
 
 void fld_client_fini(struct lu_client_fld *fld);
 
@@ -157,10 +177,10 @@ int fld_client_delete(struct lu_client_fld *fld,
                       seqno_t seq);
 
 int fld_client_add_target(struct lu_client_fld *fld,
-                          struct obd_export *exp);
+                          struct lu_fld_target *tar);
 
 int fld_client_del_target(struct lu_client_fld *fld,
-                          struct obd_export *exp);
+                          __u64 idx);
 
 /* cache methods */
 struct fld_cache_info *fld_cache_init(int hash_size,
index 39a80b2..072bfad 100644 (file)
@@ -299,6 +299,7 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         struct lustre_handle conn = {0, };
         struct obd_device *mdc_obd;
         struct obd_export *mdc_exp;
+        struct lu_fld_target target;
         int rc;
 #ifdef __KERNEL__
         struct proc_dir_entry *lmv_proc_dir;
@@ -337,7 +338,12 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
         }
 
         mdc_exp = class_conn2export(&conn);
-        fld_client_add_target(&lmv->lmv_fld, mdc_exp);
+
+        target.ft_srv = NULL;
+        target.ft_exp = mdc_exp;
+        target.ft_idx = tgt->idx;
+        
+        fld_client_add_target(&lmv->lmv_fld, &target);
 
         mdc_data = &class_exp2cliimp(mdc_exp)->imp_connect_data;
 
@@ -860,7 +866,7 @@ static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
        }
 #endif
         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
-                             LUSTRE_CLI_FLD_HASH_DHT);
+                             LUSTRE_CLI_FLD_HASH_DHT, NULL);
         if (rc) {
                 CERROR("can't init FLD, err %d\n",
                        rc);
index c1fa898..3ed3ccf 100644 (file)
@@ -1155,7 +1155,7 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
 static int mdc_fid_init(struct obd_export *exp)
 {
         struct client_obd *cli = &exp->exp_obd->u.cli;
-        char *uuid;
+        char *prefix;
         int rc;
         ENTRY;
 
@@ -1163,17 +1163,18 @@ static int mdc_fid_init(struct obd_export *exp)
         if (cli->cl_seq == NULL)
                 RETURN(-ENOMEM);
 
-        OBD_ALLOC(uuid, MAX_OBD_NAME + 5);
-        if (uuid == NULL)
+        OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+        if (prefix == NULL)
                 GOTO(out_free_seq, rc = -ENOMEM);
 
-        snprintf(uuid, MAX_OBD_NAME + 5, "srv-%s",
+        snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s",
                  exp->exp_obd->obd_name);
 
         /* init client side sequence-manager */
-        rc = seq_client_init(cli->cl_seq, uuid,
-                             exp, LUSTRE_SEQ_METADATA);
-        OBD_FREE(uuid, MAX_OBD_NAME + 5);
+        rc = seq_client_init(cli->cl_seq, exp, 
+                             LUSTRE_SEQ_METADATA,
+                             prefix, NULL, NULL);
+        OBD_FREE(prefix, MAX_OBD_NAME + 5);
         if (rc)
                 GOTO(out_free_seq, rc);
 
index be62a1b..e183507 100644 (file)
@@ -1354,7 +1354,6 @@ static int mdt_req_handle(struct mdt_thread_info *info,
         RETURN(rc);
 }
 
-
 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
 {
         lh->mlh_lh.cookie = 0ull;
@@ -1907,11 +1906,19 @@ static int mdt_seq_fini(const struct lu_context *ctx,
                 OBD_FREE_PTR(ls->ls_server_seq);
                 ls->ls_server_seq = NULL;
         }
+        
         if (ls && ls->ls_control_seq) {
                 seq_server_fini(ls->ls_control_seq, ctx);
                 OBD_FREE_PTR(ls->ls_control_seq);
                 ls->ls_control_seq = NULL;
         }
+
+        if (ls && ls->ls_client_seq) {
+                seq_client_fini(ls->ls_client_seq);
+                OBD_FREE_PTR(ls->ls_client_seq);
+                ls->ls_client_seq = NULL;
+        }
+
         RETURN(0);
 }
 
@@ -1920,40 +1927,86 @@ static int mdt_seq_init(const struct lu_context *ctx,
                         struct mdt_device *m)
 {
         struct lu_site *ls;
+        char *prefix;
         int rc;
         ENTRY;
 
         ls = m->mdt_md_dev.md_lu_dev.ld_site;
 
-        /* sequence-controller node */
+        /*
+         * This is sequence-controller node. Init seq-controller server on local
+         * MDT.
+         */
         if (ls->ls_node_id == 0) {
                 LASSERT(ls->ls_control_seq == NULL);
+
                 OBD_ALLOC_PTR(ls->ls_control_seq);
+                if (ls->ls_control_seq == NULL)
+                        RETURN(-ENOMEM);
 
-                if (ls->ls_control_seq != NULL) {
-                        rc = seq_server_init(ls->ls_control_seq,
-                                             m->mdt_bottom, uuid,
-                                             LUSTRE_SEQ_CONTROLLER,
-                                             ctx);
-                } else
-                        rc = -ENOMEM;
+                rc = seq_server_init(ls->ls_control_seq,
+                                     m->mdt_bottom, uuid,
+                                     LUSTRE_SEQ_CONTROLLER,
+                                     ctx);
+
+                if (rc)
+                        GOTO(out_seq_fini, rc);
+                
+                OBD_ALLOC_PTR(ls->ls_client_seq);
+                if (ls->ls_client_seq == NULL)
+                        GOTO(out_seq_fini, rc = -ENOMEM);
+
+                OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+                if (prefix == NULL) {
+                        OBD_FREE_PTR(ls->ls_client_seq);
+                        GOTO(out_seq_fini, rc = -ENOMEM);
+                }
+
+                snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
+                         uuid);
+
+                /*
+                 * Init seq-controller client after seq-controller server is
+                 * ready. Pass ls->ls_control_seq to it for direct talking.
+                 */
+                rc = seq_client_init(ls->ls_client_seq, NULL,
+                                     LUSTRE_SEQ_METADATA, prefix,
+                                     ls->ls_control_seq, ctx);
+                OBD_FREE(prefix, MAX_OBD_NAME + 5);
+
+                if (rc)
+                        GOTO(out_seq_fini, rc);
         }
 
+        /* Init seq-server on local MDT */
         LASSERT(ls->ls_server_seq == NULL);
+        
         OBD_ALLOC_PTR(ls->ls_server_seq);
+        if (ls->ls_server_seq == NULL)
+                GOTO(out_seq_fini, rc = -ENOMEM);
 
-        if (ls->ls_server_seq != NULL) {
-                rc = seq_server_init(ls->ls_server_seq,
-                                     m->mdt_bottom, uuid,
-                                     LUSTRE_SEQ_SERVER,
-                                     ctx);
-        } else
-                rc = -ENOMEM;
+        rc = seq_server_init(ls->ls_server_seq,
+                             m->mdt_bottom, uuid,
+                             LUSTRE_SEQ_SERVER,
+                             ctx);
+        if (rc)
+                GOTO(out_seq_fini, rc = -ENOMEM);
 
+        /* Assign seq-controller client to local seq-server. */
+        if (ls->ls_node_id == 0) {
+                LASSERT(ls->ls_client_seq != NULL);
+                
+                rc = seq_server_set_cli(ls->ls_server_seq,
+                                        ls->ls_client_seq,
+                                        ctx);
+        }
+        
+        EXIT;
+out_seq_fini:
         if (rc)
                 mdt_seq_fini(ctx, m);
 
-        RETURN(rc);
+        return rc;
 }
 
 /*
@@ -1986,9 +2039,9 @@ static int mdt_seq_init_cli(const struct lu_context *ctx,
                 RETURN(-EINVAL);
         }
 
-        /* check if this is first MDC add and controller is not yet
+        /* check if this is adding the first MDC and controller is not yet
          * initialized. */
-        if (index != 0 || ls->ls_client_exp)
+        if (index != 0 || ls->ls_client_seq)
                 RETURN(0);
 
         uuid_str = lustre_cfg_string(cfg, 1);
@@ -2021,19 +2074,20 @@ static int mdt_seq_init_cli(const struct lu_context *ctx,
                         OBD_ALLOC_PTR(ls->ls_client_seq);
 
                         if (ls->ls_client_seq != NULL) {
-                                char *uuid;
+                                char *prefix;
 
-                                OBD_ALLOC(uuid, MAX_OBD_NAME + 5);
-                                if (!uuid)
+                                OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
+                                if (!prefix)
                                         RETURN(-ENOMEM);
 
-                                snprintf(uuid, MAX_OBD_NAME + 5, "ctl-%s",
-                                                        mdc->obd_name);
+                                snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
+                                         mdc->obd_name);
 
                                 rc = seq_client_init(ls->ls_client_seq,
-                                                     uuid, ls->ls_client_exp,
-                                                     LUSTRE_SEQ_METADATA);
-                                OBD_FREE(uuid, MAX_OBD_NAME + 5);
+                                                     ls->ls_client_exp,
+                                                     LUSTRE_SEQ_METADATA,
+                                                     prefix, NULL, NULL);
+                                OBD_FREE(prefix, MAX_OBD_NAME + 5);
                         } else
                                 rc = -ENOMEM;
 
@@ -2054,6 +2108,7 @@ static int mdt_seq_init_cli(const struct lu_context *ctx,
 static void mdt_seq_fini_cli(struct mdt_device *m)
 {
         struct lu_site *ls;
+        int rc;
 
         ENTRY;
 
@@ -2063,20 +2118,13 @@ static void mdt_seq_fini_cli(struct mdt_device *m)
                 seq_server_set_cli(ls->ls_server_seq,
                                    NULL, NULL);
 
-        if (ls && ls->ls_client_seq) {
-                seq_client_fini(ls->ls_client_seq);
-                OBD_FREE_PTR(ls->ls_client_seq);
-                ls->ls_client_seq = NULL;
-        }
-
         if (ls && ls->ls_client_exp) {
-                int rc = obd_disconnect(ls->ls_client_exp);
-                ls->ls_client_exp = NULL;
-
+                rc = obd_disconnect(ls->ls_client_exp);
                 if (rc) {
                         CERROR("failure to disconnect "
                                "obd: %d\n", rc);
                 }
+                ls->ls_client_exp = NULL;
         }
         EXIT;
 }
@@ -2084,43 +2132,72 @@ static void mdt_seq_fini_cli(struct mdt_device *m)
 /*
  * FLD wrappers
  */
-static int mdt_fld_init(const struct lu_context *ctx,
-                        const char *uuid,
+static int mdt_fld_fini(const struct lu_context *ctx,
                         struct mdt_device *m)
 {
-        struct lu_site *ls;
-        int rc;
+        struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
         ENTRY;
 
-        ls = m->mdt_md_dev.md_lu_dev.ld_site;
-
-        OBD_ALLOC_PTR(ls->ls_server_fld);
+        if (ls && ls->ls_server_fld) {
+                fld_server_fini(ls->ls_server_fld, ctx);
+                OBD_FREE_PTR(ls->ls_server_fld);
+                ls->ls_server_fld = NULL;
+        }
 
-        if (ls->ls_server_fld != NULL) {
-                rc = fld_server_init(ls->ls_server_fld, ctx,
-                                     m->mdt_bottom, uuid);
-                if (rc) {
-                        OBD_FREE_PTR(ls->ls_server_fld);
-                        ls->ls_server_fld = NULL;
-                }
-        } else
-                rc = -ENOMEM;
+        if (ls && ls->ls_client_fld != NULL) {
+                fld_client_fini(ls->ls_client_fld);
+                OBD_FREE_PTR(ls->ls_client_fld);
+                ls->ls_client_fld = NULL;
+        }
 
-        RETURN(rc);
+        RETURN(0);
 }
 
-static int mdt_fld_fini(const struct lu_context *ctx,
+static int mdt_fld_init(const struct lu_context *ctx,
+                        const char *uuid,
                         struct mdt_device *m)
 {
-        struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
+        struct lu_fld_target target;
+        struct lu_site *ls;
+        int rc;
         ENTRY;
 
-        if (ls && ls->ls_server_fld) {
-                fld_server_fini(ls->ls_server_fld, ctx);
+        ls = m->mdt_md_dev.md_lu_dev.ld_site;
+
+        OBD_ALLOC_PTR(ls->ls_server_fld);
+        if (ls->ls_server_fld == NULL)
+                RETURN(rc = -ENOMEM);
+
+        rc = fld_server_init(ls->ls_server_fld, ctx,
+                             m->mdt_bottom, uuid);
+        if (rc) {
                 OBD_FREE_PTR(ls->ls_server_fld);
                 ls->ls_server_fld = NULL;
         }
-        RETURN(0);
+
+        OBD_ALLOC_PTR(ls->ls_client_fld);
+        if (!ls->ls_client_fld)
+                GOTO(out_fld_fini, rc = -ENOMEM);
+
+        rc = fld_client_init(ls->ls_client_fld, uuid,
+                             LUSTRE_CLI_FLD_HASH_DHT,
+                             ctx);
+        if (rc) {
+                CERROR("can't init FLD, err %d\n",  rc);        
+                OBD_FREE_PTR(ls->ls_client_fld);
+                GOTO(out_fld_fini, rc);
+        }
+
+        target.ft_srv = ls->ls_server_fld;
+        target.ft_idx = ls->ls_node_id;
+        target.ft_exp = NULL;
+        
+        fld_client_add_target(ls->ls_client_fld, &target);
+        EXIT;
+out_fld_fini:
+        if (rc)
+                mdt_fld_fini(ctx, m);
+        return rc;
 }
 
 /* device init/fini methods */
index 0270de8..b2b888a 100644 (file)
@@ -1406,7 +1406,8 @@ out:
         // for_all_existing_mdt except current one
         for (i = 0; i < INDEX_MAP_SIZE * 8; i++){
                 char *mdtname;
-                if (test_bit(i,  fsdb->fsdb_mdt_index_map)) {
+                if (i !=  mti->mti_stripe_index &&
+                    test_bit(i,  fsdb->fsdb_mdt_index_map)) {
                         sprintf(mdt_index,"-MDT%04x",i);
                         
                         name_create(&mdtname, mti->mti_fsname, mdt_index);
index 4fa872b..86e8503 100644 (file)
@@ -1246,7 +1246,13 @@ static int mkfs_mdt(struct mkfs_opts *mop)
                 goto out_rmdir;
         }
 
-        snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq");
+        snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq_ctl");
+        ret = touch_file(filepnm);
+        if (ret) {
+                goto out_umount;
+        }
+
+        snprintf(filepnm, sizeof(filepnm) - 1, "%s/%s", mntpt, "seq_srv");
         ret = touch_file(filepnm);
         if (ret) {
                 goto out_umount;