Whamcloud - gitweb
LU-3529 osp: init FID client for OSP on MDT. 58/7158/19
authorwang di <di.wang@intel.com>
Mon, 12 May 2014 05:33:17 +0000 (22:33 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 13 Jan 2014 18:56:37 +0000 (18:56 +0000)
1. Initialize FID client for OSP on MDT.
2. Pull out OSP precreate stuff into a separate structure,
so OSP on MDT does not have to allocate precreate.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I3c4a87e2fbea2a733fc6a4a296a50ba67652aa32
Reviewed-on: http://review.whamcloud.com/7158
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Tested-by: Jenkins
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/fid/fid_request.c
lustre/osp/lproc_osp.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_precreate.c
lustre/osp/osp_sync.c

index 544b6ce..14de931 100644 (file)
@@ -104,10 +104,14 @@ static int seq_client_rpc(struct lu_client_seq *seq,
                        req->rq_no_delay = req->rq_no_resend = 1;
                debug_mask = D_CONSOLE;
        } else {
-               if (seq->lcs_type == LUSTRE_SEQ_METADATA)
+               if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
+                       req->rq_reply_portal = MDC_REPLY_PORTAL;
                        req->rq_request_portal = SEQ_METADATA_PORTAL;
-               else
+               } else {
+                       req->rq_reply_portal = OSC_REPLY_PORTAL;
                        req->rq_request_portal = SEQ_DATA_PORTAL;
+               }
+
                debug_mask = D_INFO;
        }
 
index 22e3372..75ebeb1 100644 (file)
@@ -201,7 +201,7 @@ static int osp_rd_create_count(char *page, char **start, off_t off, int count,
        struct obd_device *obd = data;
        struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        return snprintf(page, count, "%d\n", osp->opd_pre_grow_count);
@@ -214,7 +214,7 @@ static int osp_wr_create_count(struct file *file, const char *buffer,
        struct osp_device       *osp = lu2osp_dev(obd->obd_lu_dev);
        int                      val, rc, i;
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        rc = lprocfs_write_helper(buffer, count, &val);
@@ -249,7 +249,7 @@ static int osp_rd_max_create_count(char *page, char **start, off_t off,
        struct obd_device *obd = data;
        struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        return snprintf(page, count, "%d\n", osp->opd_pre_max_grow_count);
@@ -262,7 +262,7 @@ static int osp_wr_max_create_count(struct file *file, const char *buffer,
        struct osp_device       *osp = lu2osp_dev(obd->obd_lu_dev);
        int                      val, rc;
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        rc = lprocfs_write_helper(buffer, count, &val);
@@ -288,7 +288,7 @@ static int osp_rd_prealloc_next_id(char *page, char **start, off_t off,
        struct obd_device *obd = data;
        struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        return snprintf(page, count, "%u\n",
@@ -301,7 +301,7 @@ static int osp_rd_prealloc_last_id(char *page, char **start, off_t off,
        struct obd_device *obd = data;
        struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        return snprintf(page, count, "%u\n",
@@ -314,7 +314,7 @@ static int osp_rd_prealloc_next_seq(char *page, char **start, off_t off,
        struct obd_device *obd = data;
        struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        return snprintf(page, count, LPX64"\n",
@@ -327,7 +327,7 @@ static int osp_rd_prealloc_last_seq(char *page, char **start, off_t off,
        struct obd_device *obd = data;
        struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        return snprintf(page, count, LPX64"\n",
@@ -340,7 +340,7 @@ static int osp_rd_prealloc_reserved(char *page, char **start, off_t off,
        struct obd_device *obd = data;
        struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return 0;
 
        return snprintf(page, count, LPU64"\n", osp->opd_pre_reserved);
@@ -389,7 +389,7 @@ static int osp_rd_pre_status(char *page, char **start, off_t off,
        struct osp_device       *osp = lu2osp_dev(dev->obd_lu_dev);
        int                      rc;
 
-       if (osp == NULL)
+       if (osp == NULL || osp->opd_pre == NULL)
                return -EINVAL;
 
        rc = snprintf(page, count, "%d\n", osp->opd_pre_status);
index 096de6a..d246678 100644 (file)
@@ -421,7 +421,8 @@ static int osp_recovery_complete(const struct lu_env *env,
 
        ENTRY;
        osp->opd_recovery_completed = 1;
-       if (!osp->opd_connect_mdt)
+
+       if (!osp->opd_connect_mdt && osp->opd_pre != NULL)
                wake_up(&osp->opd_pre_waitq);
        RETURN(rc);
 }
@@ -446,6 +447,9 @@ static int osp_statfs(const struct lu_env *env, struct dt_device *dev,
        if (unlikely(d->opd_imp_active == 0))
                RETURN(-ENOTCONN);
 
+       if (d->opd_pre == NULL)
+               RETURN(0);
+
        /* return recently updated data */
        *sfs = d->opd_statfs;
 
@@ -453,7 +457,6 @@ static int osp_statfs(const struct lu_env *env, struct dt_device *dev,
         * layer above osp (usually lod) can use ffree to estimate
         * how many objects are available for immediate creation
         */
-
        spin_lock(&d->opd_pre_lock);
        LASSERTF(fid_seq(&d->opd_pre_last_created_fid) ==
                 fid_seq(&d->opd_pre_used_fid),
@@ -687,6 +690,14 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m,
 
        osp_lprocfs_init(m);
 
+       rc = obd_fid_init(m->opd_obd, NULL, m->opd_connect_mdt ?
+                         LUSTRE_SEQ_METADATA : LUSTRE_SEQ_DATA);
+       if (rc) {
+               CERROR("%s: fid init error: rc = %d\n",
+                      m->opd_obd->obd_name, rc);
+               GOTO(out_proc, rc);
+       }
+
        if (!m->opd_connect_mdt) {
                /* Initialize last id from the storage - will be
                 * used in orphan cleanup. */
@@ -694,12 +705,6 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m,
                if (rc)
                        GOTO(out_proc, rc);
 
-               rc = obd_fid_init(m->opd_obd, NULL, LUSTRE_SEQ_DATA);
-               if (rc) {
-                       CERROR("%s: fid init error: rc = %d\n",
-                              m->opd_obd->obd_name, rc);
-                       GOTO(out_last_used, rc);
-               }
 
                /* Initialize precreation thread, it handles new
                 * connections as well. */
@@ -714,8 +719,8 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m,
                rc = osp_sync_init(env, m);
                if (rc)
                        GOTO(out_precreat, rc);
-
        }
+
        /*
         * Initiate connect to OST
         */
@@ -856,6 +861,33 @@ static int osp_reconnect(const struct lu_env *env,
        return 0;
 }
 
+static int osp_prepare_fid_client(struct osp_device *osp)
+{
+       int rc;
+
+       LASSERT(osp->opd_obd->u.cli.cl_seq != NULL);
+       if (osp->opd_obd->u.cli.cl_seq->lcs_exp != NULL)
+               return 0;
+
+       LASSERT(osp->opd_exp != NULL);
+       osp->opd_obd->u.cli.cl_seq->lcs_exp =
+                               class_export_get(osp->opd_exp);
+       if (osp->opd_pre == NULL)
+               return 0;
+
+       /* Init fid for osp_precreate if necessary */
+       rc = osp_init_pre_fid(osp);
+       if (rc != 0) {
+               class_export_put(osp->opd_exp);
+               osp->opd_obd->u.cli.cl_seq->lcs_exp = NULL;
+               CERROR("%s: init pre fid error: rc = %d\n",
+                      osp->opd_obd->obd_name, rc);
+               return rc;
+       }
+
+       return 0;
+}
+
 /*
  * we use exports to track all LOD users
  */
@@ -1014,27 +1046,42 @@ static int osp_import_event(struct obd_device *obd, struct obd_import *imp,
                d->opd_imp_connected = 0;
                if (d->opd_connect_mdt)
                        break;
-               osp_pre_update_status(d, -ENODEV);
-               wake_up(&d->opd_pre_waitq);
+
+               if (d->opd_pre != NULL) {
+                       osp_pre_update_status(d, -ENODEV);
+                       wake_up(&d->opd_pre_waitq);
+               }
+
                CDEBUG(D_HA, "got disconnected\n");
                break;
        case IMP_EVENT_INACTIVE:
                d->opd_imp_active = 0;
                if (d->opd_connect_mdt)
                        break;
-               osp_pre_update_status(d, -ENODEV);
-               wake_up(&d->opd_pre_waitq);
+
+               if (d->opd_pre != NULL) {
+                       osp_pre_update_status(d, -ENODEV);
+                       wake_up(&d->opd_pre_waitq);
+               }
+
                CDEBUG(D_HA, "got inactive\n");
                break;
        case IMP_EVENT_ACTIVE:
                d->opd_imp_active = 1;
+
+               if (osp_prepare_fid_client(d) != 0)
+                       break;
+
                if (d->opd_got_disconnected)
                        d->opd_new_connection = 1;
                d->opd_imp_connected = 1;
                d->opd_imp_seen_connected = 1;
                if (d->opd_connect_mdt)
                        break;
-               wake_up(&d->opd_pre_waitq);
+
+               if (d->opd_pre != NULL)
+                       wake_up(&d->opd_pre_waitq);
+
                __osp_sync_check_for_work(d);
                CDEBUG(D_HA, "got connected\n");
                break;
index 92c7b72..025a249 100644 (file)
@@ -65,6 +65,34 @@ struct osp_id_tracker {
        cfs_atomic_t             otr_refcount;
 };
 
+struct osp_precreate {
+       /*
+        * Precreation pool
+        */
+       spinlock_t                       osp_pre_lock;
+
+       /* last fid to assign in creation */
+       struct lu_fid                    osp_pre_used_fid;
+       /* last created id OST reported, next-created - available id's */
+       struct lu_fid                    osp_pre_last_created_fid;
+       /* how many ids are reserved in declare, we shouldn't block in create */
+       __u64                            osp_pre_reserved;
+       /* thread waits for signals about pool going empty */
+       wait_queue_head_t                osp_pre_waitq;
+       /* consumers (who needs new ids) wait here */
+       wait_queue_head_t                osp_pre_user_waitq;
+       /* current precreation status: working, failed, stopping? */
+       int                              osp_pre_status;
+       /* how many to precreate next time */
+       int                              osp_pre_grow_count;
+       int                              osp_pre_min_grow_count;
+       int                              osp_pre_max_grow_count;
+       /* whether to grow precreation window next time or not */
+       int                              osp_pre_grow_slow;
+       /* cleaning up orphans or recreating missing objects */
+       int                              osp_pre_recovering;
+};
+
 struct osp_device {
        struct dt_device                 opd_dt_dev;
        /* corresponded OST index */
@@ -105,33 +133,10 @@ struct osp_device {
         * reported via ->ldo_recovery_complete() */
        int                              opd_recovery_completed;
 
-       /*
-        * Precreation pool
-        */
-       spinlock_t                       opd_pre_lock;
-
-       /* last fid to assign in creation */
-       struct lu_fid                    opd_pre_used_fid;
-       /* last created id OST reported, next-created - available id's */
-       struct lu_fid                    opd_pre_last_created_fid;
-       /* how many ids are reserved in declare, we shouldn't block in create */
-       __u64                            opd_pre_reserved;
+       /* precreate structure for OSP */
+       struct osp_precreate            *opd_pre;
        /* dedicate precreate thread */
        struct ptlrpc_thread             opd_pre_thread;
-       /* thread waits for signals about pool going empty */
-       wait_queue_head_t                opd_pre_waitq;
-       /* consumers (who needs new ids) wait here */
-       wait_queue_head_t                opd_pre_user_waitq;
-       /* current precreation status: working, failed, stopping? */
-       int                              opd_pre_status;
-       /* how many to precreate next time */
-       int                              opd_pre_grow_count;
-       int                              opd_pre_min_grow_count;
-       int                              opd_pre_max_grow_count;
-       /* whether to grow precreation window next time or not */
-       int                              opd_pre_grow_slow;
-       /* cleaning up orphans or recreating missing objects */
-       int                              opd_pre_recovering;
 
        /*
         * OST synchronization
@@ -181,6 +186,19 @@ struct osp_device {
        cfs_proc_dir_entry_t            *opd_symlink;
 };
 
+#define opd_pre_lock                   opd_pre->osp_pre_lock
+#define opd_pre_used_fid               opd_pre->osp_pre_used_fid
+#define opd_pre_last_created_fid       opd_pre->osp_pre_last_created_fid
+#define opd_pre_reserved               opd_pre->osp_pre_reserved
+#define opd_pre_waitq                  opd_pre->osp_pre_waitq
+#define opd_pre_user_waitq             opd_pre->osp_pre_user_waitq
+#define opd_pre_status                 opd_pre->osp_pre_status
+#define opd_pre_grow_count             opd_pre->osp_pre_grow_count
+#define opd_pre_min_grow_count         opd_pre->osp_pre_min_grow_count
+#define opd_pre_max_grow_count         opd_pre->osp_pre_max_grow_count
+#define opd_pre_grow_slow              opd_pre->osp_pre_grow_slow
+#define opd_pre_recovering             opd_pre->osp_pre_recovering
+
 extern struct kmem_cache *osp_object_kmem;
 
 /* this is a top object */
@@ -413,6 +431,7 @@ void osp_statfs_need_now(struct osp_device *d);
 int osp_reset_last_used(const struct lu_env *env, struct osp_device *osp);
 int osp_write_last_oid_seq_files(struct lu_env *env, struct osp_device *osp,
                                 struct lu_fid *fid, int sync);
+int osp_init_pre_fid(struct osp_device *osp);
 
 /* lproc_osp.c */
 void lprocfs_osp_init_vars(struct lprocfs_static_vars *lvars);
index b66a66a..c790581 100644 (file)
@@ -340,7 +340,6 @@ static int osp_md_declare_object_create(const struct lu_env *env,
        int                     buf_count;
        int                     rc;
 
-
        update = osp_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
index 61fe85b..4642d51 100644 (file)
@@ -244,6 +244,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
        if (!o->opo_reserved) {
                /* special case, id was assigned outside of transaction
                 * see comments in osp_declare_attr_set */
+               LASSERT(d->opd_pre != NULL);
                spin_lock(&d->opd_pre_lock);
                osp_update_last_fid(d, fid);
                spin_unlock(&d->opd_pre_lock);
@@ -267,6 +268,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
 
        /* we might have lost precreated objects */
        if (unlikely(d->opd_gap_count) > 0) {
+               LASSERT(d->opd_pre != NULL);
                spin_lock(&d->opd_pre_lock);
                if (d->opd_gap_count > 0) {
                        int count = d->opd_gap_count;
@@ -402,6 +404,7 @@ static void osp_object_release(const struct lu_env *env, struct lu_object *o)
         * this may require lu_object_put() in LOD
         */
        if (unlikely(po->opo_reserved)) {
+               LASSERT(d->opd_pre != NULL);
                LASSERT(d->opd_pre_reserved > 0);
                spin_lock(&d->opd_pre_lock);
                d->opd_pre_reserved--;
index 6b74f12..6b1db95 100644 (file)
@@ -812,7 +812,7 @@ out:
        wake_up(&d->opd_pre_user_waitq);
 }
 
-static int osp_init_pre_fid(struct osp_device *osp)
+int osp_init_pre_fid(struct osp_device *osp)
 {
        struct lu_env           env;
        struct osp_thread_info  *osi;
@@ -821,6 +821,8 @@ static int osp_init_pre_fid(struct osp_device *osp)
        int                     rc;
        ENTRY;
 
+       LASSERT(osp->opd_pre != NULL);
+
        /* Return if last_used fid has been initialized */
        if (!fid_is_zero(&osp->opd_last_used_fid))
                RETURN(0);
@@ -910,20 +912,9 @@ static int osp_precreate_thread(void *_arg)
                        break;
 
                LASSERT(d->opd_obd->u.cli.cl_seq != NULL);
-               if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL) {
-                       /* Get new sequence for client first */
-                       LASSERT(d->opd_exp != NULL);
-                       d->opd_obd->u.cli.cl_seq->lcs_exp =
-                       class_export_get(d->opd_exp);
-                       rc = osp_init_pre_fid(d);
-                       if (rc != 0) {
-                               class_export_put(d->opd_exp);
-                               d->opd_obd->u.cli.cl_seq->lcs_exp = NULL;
-                               CERROR("%s: init pre fid error: rc = %d\n",
-                                      d->opd_obd->obd_name, rc);
-                               continue;
-                       }
-               }
+               /* Sigh, fid client is not ready yet */
+               if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL)
+                       continue;
 
                osp_statfs_update(d);
 
@@ -1250,6 +1241,10 @@ int osp_init_precreate(struct osp_device *d)
 
        ENTRY;
 
+       OBD_ALLOC_PTR(d->opd_pre);
+       if (d->opd_pre == NULL)
+               RETURN(-ENOMEM);
+
        /* initially precreation isn't ready */
        d->opd_pre_status = -EAGAIN;
        fid_zero(&d->opd_pre_used_fid);
@@ -1297,17 +1292,25 @@ int osp_init_precreate(struct osp_device *d)
 
 void osp_precreate_fini(struct osp_device *d)
 {
-       struct ptlrpc_thread *thread = &d->opd_pre_thread;
+       struct ptlrpc_thread *thread;
 
        ENTRY;
 
        cfs_timer_disarm(&d->opd_statfs_timer);
 
+       if (d->opd_pre == NULL)
+               RETURN_EXIT;
+
+       thread = &d->opd_pre_thread;
+
        thread->t_flags = SVC_STOPPING;
        wake_up(&d->opd_pre_waitq);
 
        wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
 
+       OBD_FREE_PTR(d->opd_pre);
+       d->opd_pre = NULL;
+
        EXIT;
 }
 
index 14a5566..fc19031 100644 (file)
@@ -348,6 +348,8 @@ static int osp_sync_interpret(const struct lu_env *env,
               rc, (unsigned) req->rq_transno);
        LASSERT(rc || req->rq_transno);
 
+       LASSERT(d->opd_pre != NULL);
+
        if (rc == -ENOENT) {
                /*
                 * we tried to destroy object or update attributes,
@@ -664,6 +666,7 @@ static void osp_sync_process_committed(const struct lu_env *env,
         * notice: we do this upon commit as well because some backends
         * (like DMU) do not release space right away.
         */
+       LASSERT(d->opd_pre != NULL);
        if (unlikely(d->opd_pre_status == -ENOSPC))
                osp_statfs_need_now(d);