Whamcloud - gitweb
LU-1445 ofd: Add fid support on OFD
authorwangdi <di.wang@whamcloud.com>
Tue, 24 Sep 2013 09:59:57 +0000 (02:59 -0700)
committerOleg Drokin <green@whamcloud.com>
Mon, 14 Jan 2013 14:00:41 +0000 (09:00 -0500)
1. Init fid server related structure in OFD, which will
become a sequence metaserver, i.e. MDT will request sequence
from OST(OFD), OFD will request super sequence from MDT0

2. OST FID sequence width will be 4 billion, which is different
with MDS FID.

Change-Id: If861cf5814ea77ce72464525a8de71ac182d742c
Signed-off-by: Wang Di <di.wang@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/4326
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
lustre/fid/fid_handler.c
lustre/fid/fid_request.c
lustre/fid/lproc_fid.c
lustre/include/lustre_fid.h
lustre/mdt/mdt_handler.c
lustre/obdecho/echo_client.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_fs.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_obd.c

index d4c2611..e36935f 100644 (file)
@@ -80,11 +80,11 @@ int seq_server_set_cli(struct lu_server_seq *seq,
                 GOTO(out_up, rc = 0);
         }
 
-        if (seq->lss_cli != NULL) {
-                CERROR("%s: Sequence controller is already "
-                       "assigned\n", seq->lss_name);
-                GOTO(out_up, rc = -EINVAL);
-        }
+       if (seq->lss_cli != NULL) {
+               CDEBUG(D_HA, "%s: Sequence controller is already "
+                      "assigned\n", seq->lss_name);
+               GOTO(out_up, rc = -EEXIST);
+       }
 
         CDEBUG(D_INFO, "%s: Attached sequence controller %s\n",
                seq->lss_name, cli->lcs_name);
@@ -365,7 +365,7 @@ static int seq_req_handle(struct ptlrpc_request *req,
 LU_KEY_INIT_FINI(seq, struct seq_thread_info);
 
 /* context key: seq_thread_key */
-LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD);
+LU_CONTEXT_KEY_DEFINE(seq, LCT_MD_THREAD | LCT_DT_THREAD);
 
 static void seq_thread_info_init(struct ptlrpc_request *req,
                                  struct seq_thread_info *info)
@@ -557,6 +557,33 @@ void seq_server_fini(struct lu_server_seq *seq,
 }
 EXPORT_SYMBOL(seq_server_fini);
 
+int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss)
+{
+       if (ss == NULL)
+               RETURN(0);
+
+       if (ss->ss_server_seq) {
+               seq_server_fini(ss->ss_server_seq, env);
+               OBD_FREE_PTR(ss->ss_server_seq);
+               ss->ss_server_seq = NULL;
+       }
+
+       if (ss->ss_control_seq) {
+               seq_server_fini(ss->ss_control_seq, env);
+               OBD_FREE_PTR(ss->ss_control_seq);
+               ss->ss_control_seq = NULL;
+       }
+
+       if (ss->ss_client_seq) {
+               seq_client_fini(ss->ss_client_seq);
+               OBD_FREE_PTR(ss->ss_client_seq);
+               ss->ss_client_seq = NULL;
+       }
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(seq_site_fini);
+
 cfs_proc_dir_entry_t *seq_type_proc_dir = NULL;
 
 static int __init fid_mod_init(void)
index 7bfcc57..413d0f5 100644 (file)
@@ -278,11 +278,11 @@ int seq_client_get_seq(const struct lu_env *env,
         CDEBUG(D_INFO, "%s: allocate sequence "
                "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr);
 
-        /*Since the caller require the whole seq,
-         *so marked this seq to be used*/
-        seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH;
-        seq->lcs_fid.f_seq = *seqnr;
-        seq->lcs_fid.f_ver = 0;
+       /*Since the caller require the whole seq,
+        *so marked this seq to be used*/
+       seq->lcs_fid.f_oid = LUSTRE_METADATA_SEQ_MAX_WIDTH;
+       seq->lcs_fid.f_seq = *seqnr;
+       seq->lcs_fid.f_ver = 0;
 
         /*
          * Inform caller that sequence switch is performed to allow it
@@ -460,36 +460,37 @@ int seq_client_init(struct lu_client_seq *seq,
                     const char *prefix,
                     struct lu_server_seq *srv)
 {
-        int rc;
-        ENTRY;
+       int rc;
+       ENTRY;
 
-        LASSERT(seq != NULL);
-        LASSERT(prefix != NULL);
+       LASSERT(seq != NULL);
+       LASSERT(prefix != NULL);
+
+       seq->lcs_srv = srv;
+       seq->lcs_type = type;
 
-        seq->lcs_exp = exp;
-        seq->lcs_srv = srv;
-        seq->lcs_type = type;
        mutex_init(&seq->lcs_mutex);
-        seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
-        cfs_waitq_init(&seq->lcs_waitq);
+       if (type == LUSTRE_SEQ_METADATA)
+               seq->lcs_width = LUSTRE_METADATA_SEQ_MAX_WIDTH;
+       else
+               seq->lcs_width = LUSTRE_DATA_SEQ_MAX_WIDTH;
 
-        /* Make sure that things are clear before work is started. */
-        seq_client_flush(seq);
+       cfs_waitq_init(&seq->lcs_waitq);
+       /* Make sure that things are clear before work is started. */
+       seq_client_flush(seq);
 
-        if (exp == NULL) {
-                LASSERT(seq->lcs_srv != NULL);
-        } else {
-                LASSERT(seq->lcs_exp != NULL);
-                seq->lcs_exp = class_export_get(seq->lcs_exp);
-        }
+       if (exp != NULL)
+               seq->lcs_exp = class_export_get(exp);
+       else if (type == LUSTRE_SEQ_METADATA)
+               LASSERT(seq->lcs_srv != NULL);
 
-        snprintf(seq->lcs_name, sizeof(seq->lcs_name),
-                 "cli-%s", prefix);
+       snprintf(seq->lcs_name, sizeof(seq->lcs_name),
+                "cli-%s", prefix);
 
-        rc = seq_client_proc_init(seq);
-        if (rc)
-                seq_client_fini(seq);
-        RETURN(rc);
+       rc = seq_client_proc_init(seq);
+       if (rc)
+               seq_client_fini(seq);
+       RETURN(rc);
 }
 EXPORT_SYMBOL(seq_client_init);
 
index 664bc94..3bb44f2 100644 (file)
@@ -275,14 +275,14 @@ seq_client_proc_write_width(struct file *file, const char *buffer,
                 RETURN(rc);
         }
 
-        if (val <= LUSTRE_SEQ_MAX_WIDTH && val > 0) {
-                seq->lcs_width = val;
+       if (val <= LUSTRE_METADATA_SEQ_MAX_WIDTH && val > 0) {
+               seq->lcs_width = val;
 
-                if (rc == 0) {
-                        CDEBUG(D_INFO, "%s: Sequence size: "LPU64"\n",
-                               seq->lcs_name, seq->lcs_width);
-                }
-        }
+               if (rc == 0) {
+                       CDEBUG(D_INFO, "%s: Sequence size: "LPU64"\n",
+                              seq->lcs_name, seq->lcs_width);
+               }
+       }
 
        mutex_unlock(&seq->lcs_mutex);
 
index 0275400..99b88be 100644 (file)
@@ -169,26 +169,31 @@ extern const struct lu_fid LU_OBF_FID;
 extern const struct lu_fid LU_DOT_LUSTRE_FID;
 
 enum {
-        /*
-         * This is how may FIDs may be allocated in one sequence(128k)
-         */
-        LUSTRE_SEQ_MAX_WIDTH = 0x0000000000020000ULL,
+       /*
+        * This is how may metadata FIDs may be allocated in one sequence(128k)
+        */
+       LUSTRE_METADATA_SEQ_MAX_WIDTH = 0x0000000000020000ULL,
 
-        /*
-         * How many sequences to allocate to a client at once.
-         */
-        LUSTRE_SEQ_META_WIDTH = 0x0000000000000001ULL,
+       /*
+        * This is how many data FIDs could be allocated in one sequence(4B - 1)
+        */
+       LUSTRE_DATA_SEQ_MAX_WIDTH = 0x00000000FFFFFFFFULL,
 
-         /*
-          * seq allocation pool size.
-          */
-        LUSTRE_SEQ_BATCH_WIDTH = LUSTRE_SEQ_META_WIDTH * 1000,
+       /*
+        * How many sequences to allocate to a client at once.
+        */
+       LUSTRE_SEQ_META_WIDTH = 0x0000000000000001ULL,
 
-        /*
-         * This is how many sequences may be in one super-sequence allocated to
-         * MDTs.
-         */
-        LUSTRE_SEQ_SUPER_WIDTH = ((1ULL << 30ULL) * LUSTRE_SEQ_META_WIDTH)
+       /*
+        * seq allocation pool size.
+        */
+       LUSTRE_SEQ_BATCH_WIDTH = LUSTRE_SEQ_META_WIDTH * 1000,
+
+       /*
+        * This is how many sequences may be in one super-sequence allocated to
+        * MDTs.
+        */
+       LUSTRE_SEQ_SUPER_WIDTH = ((1ULL << 30ULL) * LUSTRE_SEQ_META_WIDTH)
 };
 
 enum {
@@ -427,6 +432,7 @@ int seq_client_alloc_fid(const struct lu_env *env, struct lu_client_seq *seq,
 int seq_client_get_seq(const struct lu_env *env, struct lu_client_seq *seq,
                        seqno_t *seqnr);
 
+int seq_site_fini(const struct lu_env *env, struct seq_server_site *ss);
 /* Fids common stuff */
 int fid_is_local(const struct lu_env *env,
                  struct lu_site *site, const struct lu_fid *fid);
index 53cb6ae..a44945c 100644 (file)
@@ -3753,31 +3753,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
 static int mdt_seq_fini(const struct lu_env *env,
                         struct mdt_device *m)
 {
-       struct seq_server_site *ss = mdt_seq_site(m);
-       ENTRY;
-
-       if (ss == NULL)
-               RETURN(0);
-
-       if (ss->ss_server_seq) {
-               seq_server_fini(ss->ss_server_seq, env);
-               OBD_FREE_PTR(ss->ss_server_seq);
-               ss->ss_server_seq = NULL;
-       }
-
-       if (ss->ss_control_seq) {
-               seq_server_fini(ss->ss_control_seq, env);
-               OBD_FREE_PTR(ss->ss_control_seq);
-               ss->ss_control_seq = NULL;
-       }
-
-       if (ss->ss_client_seq) {
-               seq_client_fini(ss->ss_client_seq);
-               OBD_FREE_PTR(ss->ss_client_seq);
-               ss->ss_client_seq = NULL;
-       }
-
-       RETURN(0);
+       return seq_site_fini(env, mdt_seq_site(m));
 }
 
 static int mdt_seq_init(const struct lu_env *env,
index f1b4f9e..6250aa6 100644 (file)
@@ -2876,11 +2876,11 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 if (cfs_copy_to_user(data->ioc_pbuf1, &seq, data->ioc_plen1))
                         return -EFAULT;
 
-                max_count = LUSTRE_SEQ_MAX_WIDTH;
-                if (cfs_copy_to_user(data->ioc_pbuf2, &max_count,
-                                     data->ioc_plen2))
-                        return -EFAULT;
-                GOTO(out, rc);
+               max_count = LUSTRE_METADATA_SEQ_MAX_WIDTH;
+               if (cfs_copy_to_user(data->ioc_pbuf2, &max_count,
+                                    data->ioc_plen2))
+                       return -EFAULT;
+               GOTO(out, rc);
         }
         case OBD_IOC_DESTROY:
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
index e8c809f..d12729a 100644 (file)
@@ -44,6 +44,7 @@
 
 #include <obd_class.h>
 #include <lustre_param.h>
+#include <lustre_fid.h>
 
 #include "ofd_internal.h"
 
@@ -479,6 +480,82 @@ static int ofd_procfs_fini(struct ofd_device *ofd)
 
 extern int ost_handle(struct ptlrpc_request *req);
 
+int ofd_fid_fini(const struct lu_env *env, struct ofd_device *ofd)
+{
+       return seq_site_fini(env, &ofd->ofd_seq_site);
+}
+
+int ofd_fid_init(const struct lu_env *env, struct ofd_device *ofd)
+{
+       struct seq_server_site  *ss = &ofd->ofd_seq_site;
+       struct lu_device        *lu = &ofd->ofd_dt_dev.dd_lu_dev;
+       char                    *obd_name = ofd_name(ofd);
+       char                    *name = NULL;
+       int                     rc = 0;
+
+       ss = &ofd->ofd_seq_site;
+       lu->ld_site->ld_seq_site = ss;
+       ss->ss_lu = lu->ld_site;
+       ss->ss_node_id = ofd->ofd_lut.lut_lsd.lsd_osd_index;
+
+       OBD_ALLOC_PTR(ss->ss_server_seq);
+       if (ss->ss_server_seq == NULL)
+               GOTO(out_free, rc = -ENOMEM);
+
+       OBD_ALLOC(name, strlen(obd_name) + 10);
+       if (!name) {
+               OBD_FREE_PTR(ss->ss_server_seq);
+               ss->ss_server_seq = NULL;
+               GOTO(out_free, rc = -ENOMEM);
+       }
+
+       rc = seq_server_init(ss->ss_server_seq, ofd->ofd_osd, obd_name,
+                            LUSTRE_SEQ_SERVER, ss, env);
+       if (rc) {
+               CERROR("%s : seq server init error %d\n", obd_name, rc);
+               GOTO(out_free, rc);
+       }
+       ss->ss_server_seq->lss_space.lsr_index = ss->ss_node_id;
+
+       OBD_ALLOC_PTR(ss->ss_client_seq);
+       if (ss->ss_client_seq == NULL)
+               GOTO(out_free, -ENOMEM);
+
+       snprintf(name, strlen(obd_name) + 6, "%p-super", obd_name);
+       rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_DATA,
+                            name, NULL);
+       if (rc) {
+               CERROR("%s : seq client init error %d\n", obd_name, rc);
+               GOTO(out_free, rc);
+       }
+       OBD_FREE(name, strlen(obd_name) + 10);
+       name = NULL;
+
+       rc = seq_server_set_cli(ss->ss_server_seq, ss->ss_client_seq, env);
+
+out_free:
+       if (rc) {
+               if (ss->ss_server_seq) {
+                       seq_server_fini(ss->ss_server_seq, env);
+                       OBD_FREE_PTR(ss->ss_server_seq);
+                       ss->ss_server_seq = NULL;
+               }
+
+               if (ss->ss_client_seq) {
+                       seq_client_fini(ss->ss_client_seq);
+                       OBD_FREE_PTR(ss->ss_client_seq);
+                       ss->ss_client_seq = NULL;
+               }
+
+               if (name) {
+                       OBD_FREE(name, strlen(obd_name) + 10);
+                       name = NULL;
+               }
+       }
+
+       return rc;
+}
+
 static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
                     struct lu_device_type *ldt, struct lustre_cfg *cfg)
 {
index e85ba4d..d7fd08d 100644 (file)
@@ -189,11 +189,28 @@ int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
        RETURN(rc);
 }
 
+static void ofd_deregister_seq_exp(struct ofd_device *ofd)
+{
+       struct seq_server_site  *ss = &ofd->ofd_seq_site;
+
+       if (ss->ss_client_seq != NULL) {
+               lustre_deregister_osp_item(&ss->ss_client_seq->lcs_exp);
+               ss->ss_client_seq->lcs_exp = NULL;
+       }
+}
+
 void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
 {
        struct ofd_seq  *oseq;
        struct ofd_seq  *tmp;
        cfs_list_t       dispose;
+       int             rc;
+
+       ofd_deregister_seq_exp(ofd);
+
+       rc = ofd_fid_fini(env, ofd);
+       if (rc != 0)
+               CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
 
        CFS_INIT_LIST_HEAD(&dispose);
        write_lock(&ofd->ofd_seq_list_lock);
@@ -288,13 +305,50 @@ cleanup:
        return ERR_PTR(rc);
 }
 
+static int ofd_register_seq_exp(struct ofd_device *ofd)
+{
+       struct seq_server_site  *ss = &ofd->ofd_seq_site;
+       char                    *osp_name = NULL;
+       int                     rc;
+
+       OBD_ALLOC(osp_name, MAX_OBD_NAME);
+       if (osp_name == NULL)
+               GOTO(out_free, rc = -ENOMEM);
+
+       rc = tgt_name2ospname(ofd_name(ofd), osp_name);
+       if (rc != 0)
+               GOTO(out_free, rc);
+
+       rc = lustre_register_osp_item(osp_name, &ss->ss_client_seq->lcs_exp,
+                                     NULL, NULL);
+out_free:
+       if (osp_name != NULL)
+               OBD_FREE(osp_name, MAX_OBD_NAME);
+
+       return rc;
+}
+
 /* object sequence management */
 int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
 {
+       int rc;
+
+       rc = ofd_fid_init(env, ofd);
+       if (rc != 0) {
+               CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc);
+               return rc;
+       }
+
+       rc = ofd_register_seq_exp(ofd);
+       if (rc) {
+               CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
+               return rc;
+       }
+
        rwlock_init(&ofd->ofd_seq_list_lock);
        CFS_INIT_LIST_HEAD(&ofd->ofd_seq_list);
        ofd->ofd_seq_count = 0;
-       return 0;
+       return rc;
 }
 
 int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd,
index 128bcfc..d346990 100644 (file)
@@ -185,6 +185,7 @@ struct ofd_device {
                                 /* shall we grant space to clients not
                                  * supporting OBD_CONNECT_GRANT_PARAM? */
                                 ofd_grant_compat_disable:1;
+       struct seq_server_site   ofd_seq_site;
 };
 
 static inline struct ofd_device *ofd_dev(struct lu_device *d)
@@ -488,6 +489,12 @@ void ofd_fmd_drop(struct obd_export *exp, struct lu_fid *fid);
 #define ofd_fmd_drop(exp, fid) do {} while (0)
 #endif
 
+/* ofd_dev.c */
+int ofd_fid_set_index(const struct lu_env *env, struct ofd_device *ofd,
+                     int index);
+int ofd_fid_init(const struct lu_env *env, struct ofd_device *ofd);
+int ofd_fid_fini(const struct lu_env *env, struct ofd_device *ofd);
+
 /* ofd_lvb.c */
 extern struct ldlm_valblock_ops ofd_lvbo;
 
index 3b16bc9..b4dd026 100644 (file)
@@ -1186,8 +1186,11 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                                ofd_obd(ofd)->obd_name);
                        GOTO(out, rc = 0);
                }
-               /* only precreate if seq == 0 and o_id is specfied */
-               if (!fid_seq_is_mdt(oa->o_seq) || oa->o_id == 0) {
+               /* only precreate if seq is 0, IDIF or normal and also o_id
+                * must be specfied */
+               if ((!fid_seq_is_mdt(oa->o_seq) &&
+                    !fid_seq_is_norm(oa->o_seq) &&
+                    !fid_seq_is_idif(oa->o_seq)) || oa->o_id == 0) {
                        diff = 1; /* shouldn't we create this right now? */
                } else {
                        diff = oa->o_id - ofd_seq_last_oid(oseq);