Whamcloud - gitweb
LU-4621 ofd: create same count of objects
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index 20783fc..7c28e1a 100644 (file)
@@ -65,12 +65,29 @@ static inline int osp_statfs_need_update(struct osp_device *d)
                                d->opd_statfs_fresh_till);
 }
 
+/*
+ * OSP tries to maintain pool of available objects so that calls to create
+ * objects don't block most of time
+ *
+ * each time OSP gets connected to OST, we should start from precreation cleanup
+ */
+static inline bool osp_precreate_running(struct osp_device *d)
+{
+       return !!(d->opd_pre_thread.t_flags & SVC_RUNNING);
+}
+
+static inline bool osp_precreate_stopped(struct osp_device *d)
+{
+       return !!(d->opd_pre_thread.t_flags & SVC_STOPPED);
+}
+
 static void osp_statfs_timer_cb(unsigned long _d)
 {
        struct osp_device *d = (struct osp_device *) _d;
 
        LASSERT(d);
-       wake_up(&d->opd_pre_waitq);
+       if (d->opd_pre != NULL && osp_precreate_running(d))
+               wake_up(&d->opd_pre_waitq);
 }
 
 static int osp_statfs_interpret(const struct lu_env *env,
@@ -108,7 +125,9 @@ static int osp_statfs_interpret(const struct lu_env *env,
        RETURN(0);
 out:
        /* couldn't update statfs, try again as soon as possible */
-       wake_up(&d->opd_pre_waitq);
+       if (d->opd_pre != NULL && osp_precreate_running(d))
+               wake_up(&d->opd_pre_waitq);
+
        if (req->rq_import_generation == imp->imp_generation)
                CDEBUG(D_CACHE, "%s: couldn't update statfs: rc = %d\n",
                       d->opd_obd->obd_name, rc);
@@ -178,45 +197,11 @@ void osp_statfs_need_now(struct osp_device *d)
        }
 }
 
-
-/*
- * OSP tries to maintain pool of available objects so that calls to create
- * objects don't block most of time
- *
- * each time OSP gets connected to OST, we should start from precreation cleanup
- */
-static inline int osp_precreate_running(struct osp_device *d)
-{
-       return !!(d->opd_pre_thread.t_flags & SVC_RUNNING);
-}
-
-static inline int osp_precreate_stopped(struct osp_device *d)
-{
-       return !!(d->opd_pre_thread.t_flags & SVC_STOPPED);
-}
-
 static inline int osp_objs_precreated(const struct lu_env *env,
                                      struct osp_device *osp)
 {
-       struct lu_fid *fid1 = &osp->opd_pre_last_created_fid;
-       struct lu_fid *fid2 = &osp->opd_pre_used_fid;
-
-       LASSERTF(fid_seq(fid1) == fid_seq(fid2),
-                "Created fid"DFID" Next fid "DFID"\n", PFID(fid1), PFID(fid2));
-
-       if (fid_is_idif(fid1)) {
-               struct ost_id *oi1 = &osp_env_info(env)->osi_oi;
-               struct ost_id *oi2 = &osp_env_info(env)->osi_oi2;
-
-               LASSERT(fid_is_idif(fid1) && fid_is_idif(fid2));
-               fid_to_ostid(fid1, oi1);
-               fid_to_ostid(fid2, oi2);
-               LASSERT(ostid_id(oi1) >= ostid_id(oi2));
-
-               return ostid_id(oi1) - ostid_id(oi2);
-       }
-
-       return fid_oid(fid1) - fid_oid(fid2);
+       return osp_fid_diff(&osp->opd_pre_last_created_fid,
+                           &osp->opd_pre_used_fid);
 }
 
 static inline int osp_precreate_near_empty_nolock(const struct lu_env *env,
@@ -286,12 +271,12 @@ int osp_write_last_oid_seq_files(struct lu_env *env, struct osp_device *osp,
 
        th->th_sync |= sync;
        rc = dt_declare_record_write(env, osp->opd_last_used_oid_file,
-                                    lb_oid->lb_len, oid_off, th);
+                                    lb_oid, oid_off, th);
        if (rc != 0)
                GOTO(out, rc);
 
        rc = dt_declare_record_write(env, osp->opd_last_used_seq_file,
-                                    lb_oseq->lb_len, oseq_off, th);
+                                    lb_oseq, oseq_off, th);
        if (rc != 0)
                GOTO(out, rc);
 
@@ -488,11 +473,14 @@ static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
                GOTO(out_req, rc = -EPROTO);
 
        ostid_to_fid(fid, &body->oa.o_oi, d->opd_index);
-       LASSERTF(lu_fid_diff(fid, &d->opd_pre_used_fid) > 0,
-                "reply fid "DFID" pre used fid "DFID"\n", PFID(fid),
-                PFID(&d->opd_pre_used_fid));
+       if (osp_fid_diff(fid, &d->opd_pre_used_fid) <= 0) {
+               CERROR("%s: precreate fid "DFID" < local used fid "DFID
+                      ": rc = %d\n", d->opd_obd->obd_name,
+                      PFID(fid), PFID(&d->opd_pre_used_fid), -ESTALE);
+               GOTO(out_req, rc = -ESTALE);
+       }
 
-       diff = lu_fid_diff(fid, &d->opd_pre_last_created_fid);
+       diff = osp_fid_diff(fid, &d->opd_pre_last_created_fid);
 
        spin_lock(&d->opd_pre_lock);
        if (diff < grow) {
@@ -507,6 +495,9 @@ static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
                d->opd_pre_grow_slow = 0;
        }
 
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       fid_to_ostid(fid, &body->oa.o_oi);
+
        d->opd_pre_last_created_fid = *fid;
        spin_unlock(&d->opd_pre_lock);
 
@@ -694,7 +685,7 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
 
        spin_lock(&d->opd_pre_lock);
-       diff = lu_fid_diff(&d->opd_last_used_fid, last_fid);
+       diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
        if (diff > 0) {
                d->opd_pre_grow_count = OST_MIN_PRECREATE + diff;
                d->opd_pre_last_created_fid = d->opd_last_used_fid;
@@ -812,7 +803,7 @@ out:
        wake_up(&d->opd_pre_user_waitq);
 }
 
-static int osp_init_pre_fid(struct osp_device *osp)
+int osp_init_pre_fid(struct osp_device *osp)
 {
        struct lu_env           env;
        struct osp_thread_info  *osi;
@@ -821,6 +812,8 @@ static int osp_init_pre_fid(struct osp_device *osp)
        int                     rc;
        ENTRY;
 
+       LASSERT(osp->opd_pre != NULL);
+
        /* Return if last_used fid has been initialized */
        if (!fid_is_zero(&osp->opd_last_used_fid))
                RETURN(0);
@@ -910,19 +903,18 @@ static int osp_precreate_thread(void *_arg)
                        break;
 
                LASSERT(d->opd_obd->u.cli.cl_seq != NULL);
-               if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL) {
-                       /* Get new sequence for client first */
-                       LASSERT(d->opd_exp != NULL);
-                       d->opd_obd->u.cli.cl_seq->lcs_exp =
-                       class_export_get(d->opd_exp);
-                       rc = osp_init_pre_fid(d);
-                       if (rc != 0) {
-                               class_export_put(d->opd_exp);
-                               d->opd_obd->u.cli.cl_seq->lcs_exp = NULL;
-                               CERROR("%s: init pre fid error: rc = %d\n",
-                                      d->opd_obd->obd_name, rc);
-                               continue;
-                       }
+               /* Sigh, fid client is not ready yet */
+               if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL)
+                       continue;
+
+               /* Init fid for osp_precreate if necessary */
+               rc = osp_init_pre_fid(d);
+               if (rc != 0) {
+                       class_export_put(d->opd_exp);
+                       d->opd_obd->u.cli.cl_seq->lcs_exp = NULL;
+                       CERROR("%s: init pre fid error: rc = %d\n",
+                              d->opd_obd->obd_name, rc);
+                       continue;
                }
 
                osp_statfs_update(d);
@@ -1010,8 +1002,16 @@ static int osp_precreate_ready_condition(const struct lu_env *env,
                return 1;
 
        /* Bail out I/O fails to OST */
-       if (d->opd_pre_status == -EIO)
+       if (d->opd_pre_status != 0 &&
+           d->opd_pre_status != -EAGAIN &&
+           d->opd_pre_status != -ENODEV &&
+           d->opd_pre_status != -ENOSPC) {
+               /* DEBUG LU-3230 */
+               if (d->opd_pre_status != -EIO)
+                       CERROR("%s: precreate failed opd_pre_status %d\n",
+                              d->opd_obd->obd_name, d->opd_pre_status);
                return 1;
+       }
 
        return 0;
 }
@@ -1020,13 +1020,13 @@ static int osp_precreate_timeout_condition(void *data)
 {
        struct osp_device *d = data;
 
-       LCONSOLE_WARN("%s: slow creates, last="DFID", next="DFID", "
-                     "reserved="LPU64", syn_changes=%lu, "
-                     "syn_rpc_in_progress=%d, status=%d\n",
-                     d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
-                     PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
-                     d->opd_syn_changes, d->opd_syn_rpc_in_progress,
-                     d->opd_pre_status);
+       CDEBUG(D_HA, "%s: slow creates, last="DFID", next="DFID", "
+             "reserved="LPU64", syn_changes=%lu, "
+             "syn_rpc_in_progress=%d, status=%d\n",
+             d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
+             PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
+             d->opd_syn_changes, d->opd_syn_rpc_in_progress,
+             d->opd_pre_status);
 
        return 1;
 }
@@ -1058,7 +1058,7 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
         *  - OST can allocate fid sequence.
         */
        while ((rc = d->opd_pre_status) == 0 || rc == -ENOSPC ||
-               rc == -ENODEV || rc == -EAGAIN) {
+               rc == -ENODEV || rc == -EAGAIN || rc == -ENOTCONN) {
 
                /*
                 * increase number of precreations
@@ -1138,7 +1138,7 @@ int osp_precreate_get_fid(const struct lu_env *env, struct osp_device *d,
        /* grab next id from the pool */
        spin_lock(&d->opd_pre_lock);
 
-       LASSERTF(lu_fid_diff(&d->opd_pre_used_fid,
+       LASSERTF(osp_fid_diff(&d->opd_pre_used_fid,
                             &d->opd_pre_last_created_fid) < 0,
                 "next fid "DFID" last created fid "DFID"\n",
                 PFID(&d->opd_pre_used_fid),
@@ -1242,6 +1242,10 @@ int osp_init_precreate(struct osp_device *d)
 
        ENTRY;
 
+       OBD_ALLOC_PTR(d->opd_pre);
+       if (d->opd_pre == NULL)
+               RETURN(-ENOMEM);
+
        /* initially precreation isn't ready */
        d->opd_pre_status = -EAGAIN;
        fid_zero(&d->opd_pre_used_fid);
@@ -1274,7 +1278,7 @@ int osp_init_precreate(struct osp_device *d)
         * start thread handling precreation and statfs updates
         */
        task = kthread_run(osp_precreate_thread, d,
-                              "osp-pre-%u", d->opd_index);
+                          "osp-pre-%u-%u", d->opd_index, d->opd_group);
        if (IS_ERR(task)) {
                CERROR("can't start precreate thread %ld\n", PTR_ERR(task));
                RETURN(PTR_ERR(task));
@@ -1289,17 +1293,25 @@ int osp_init_precreate(struct osp_device *d)
 
 void osp_precreate_fini(struct osp_device *d)
 {
-       struct ptlrpc_thread *thread = &d->opd_pre_thread;
+       struct ptlrpc_thread *thread;
 
        ENTRY;
 
        cfs_timer_disarm(&d->opd_statfs_timer);
 
+       if (d->opd_pre == NULL)
+               RETURN_EXIT;
+
+       thread = &d->opd_pre_thread;
+
        thread->t_flags = SVC_STOPPING;
        wake_up(&d->opd_pre_waitq);
 
        wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
 
+       OBD_FREE_PTR(d->opd_pre);
+       d->opd_pre = NULL;
+
        EXIT;
 }