Whamcloud - gitweb
LU-8367 osp: enable replay for precreation request
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index 124ef8d..1b77f30 100644 (file)
@@ -81,6 +81,8 @@ static void osp_statfs_timer_cb(cfs_timer_cb_arg_t data)
        struct osp_device *d = cfs_from_timer(d, data, opd_statfs_timer);
 
        LASSERT(d);
+       /* invalidate statfs data so osp_precreate_thread() can refresh */
+       d->opd_statfs_fresh_till = ktime_sub_ns(ktime_get(), NSEC_PER_SEC);
        if (d->opd_pre_task)
                wake_up(&d->opd_pre_waitq);
 }
@@ -101,12 +103,13 @@ static void osp_pre_update_msfs(struct osp_device *d, struct obd_statfs *msfs);
 static void osp_pre_update_status_msfs(struct osp_device *d,
                                       struct obd_statfs *msfs, int rc)
 {
+       CDEBUG(D_INFO, "%s: Updating status = %d\n", d->opd_obd->obd_name, rc);
        if (rc)
                d->opd_pre_status = rc;
        else
                osp_pre_update_msfs(d, msfs);
 
-       wake_up(&d->opd_pre_user_waitq);
+       wake_up_all(&d->opd_pre_user_waitq);
 }
 
 /* Pass in the old statfs data in case the limits have changed */
@@ -335,8 +338,8 @@ static inline int osp_precreate_near_empty_nolock(const struct lu_env *env,
 
        /* don't consider new precreation till OST is healty and
         * has free space */
-       return ((window - d->opd_pre_reserved < d->opd_pre_create_count / 2) &&
-               (d->opd_pre_status == 0));
+       return ((window - d->opd_pre_reserved < d->opd_pre_create_count / 2 ||
+                d->opd_force_creation) && (d->opd_pre_status == 0));
 }
 
 /**
@@ -630,9 +633,11 @@ static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
        if (req == NULL)
                RETURN(-ENOMEM);
        req->rq_request_portal = OST_CREATE_PORTAL;
-       /* we should not resend create request - anyway we will have delorphan
-        * and kill these objects */
-       req->rq_no_delay = req->rq_no_resend = 1;
+
+       /* Delorphan happens only with a first MDT-OST connect. resend/replay
+        * handles objects creation on reconnects, no need to do delorhpan
+        * in this case.
+        */
 
        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
        if (rc) {
@@ -641,7 +646,9 @@ static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
        }
 
        spin_lock(&d->opd_pre_lock);
-       if (d->opd_pre_create_count > d->opd_pre_max_create_count / 2)
+       if (d->opd_force_creation)
+               d->opd_pre_create_count = OST_MIN_PRECREATE;
+       else if (d->opd_pre_create_count > d->opd_pre_max_create_count / 2)
                d->opd_pre_create_count = d->opd_pre_max_create_count / 2;
        grow = d->opd_pre_create_count;
        spin_unlock(&d->opd_pre_lock);
@@ -679,7 +686,6 @@ static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
                        rc = -ENOTCONN;
                GOTO(out_req, rc);
        }
-       LASSERT(req->rq_transno == 0);
 
        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
@@ -714,6 +720,7 @@ ready:
        fid_to_ostid(fid, &body->oa.o_oi);
 
        d->opd_pre_last_created_fid = *fid;
+       d->opd_force_creation = false;
        spin_unlock(&d->opd_pre_lock);
 
        CDEBUG(D_HA, "%s: current precreated pool: "DFID"-"DFID"\n",
@@ -722,7 +729,6 @@ ready:
 out_req:
        /* now we can wakeup all users awaiting for objects */
        osp_pre_update_status(d, rc);
-       wake_up(&d->opd_pre_user_waitq);
 
        /* pause to let osp_precreate_reserve to go first */
        CFS_FAIL_TIMEOUT(OBD_FAIL_OSP_PRECREATE_PAUSE, 2);
@@ -741,12 +747,13 @@ out_req:
  *
  * \param[in] env      LU environment provided by the caller
  * \param[in] d                OSP device
+ * \param[in] update   update or not update last used fid
  *
  * \retval 0           on success
  * \retval negative    negated errno on error
  **/
 static int osp_get_lastfid_from_ost(const struct lu_env *env,
-                                   struct osp_device *d)
+                                   struct osp_device *d, bool update)
 {
        struct ptlrpc_request   *req = NULL;
        struct obd_import       *imp;
@@ -804,9 +811,16 @@ static int osp_get_lastfid_from_ost(const struct lu_env *env,
 
        /* Only update the last used fid, if the OST has objects for
         * this sequence, i.e. fid_oid > 0 */
-       if (fid_oid(last_fid) > 0)
+       if (fid_oid(last_fid) > 0 && update)
                d->opd_last_used_fid = *last_fid;
 
+       if (fid_oid(last_fid) == 0 &&
+           fid_seq(last_fid) == fid_seq(&d->opd_last_used_fid)) {
+               /* reformatted OST, it requires creation request
+                * to recreate objects
+                */
+               d->opd_force_creation = true;
+       }
        CDEBUG(D_HA, "%s: Got last_fid "DFID"\n", d->opd_obd->obd_name,
               PFID(last_fid));
 
@@ -847,6 +861,15 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        ENTRY;
 
        /*
+        * Do cleanup orphans only with a first connection, after that
+        * all precreate requests uses resend/replay flags to support OST
+        * failover/reconnect.
+        */
+       if (d->opd_cleanup_orphans_done) {
+               rc = osp_get_lastfid_from_ost(env, d, false);
+               RETURN(0);
+       }
+       /*
         * wait for local recovery to finish, so we can cleanup orphans
         * orphans are all objects since "last used" (assigned), but
         * there might be objects reserved and in some cases they won't
@@ -881,7 +904,7 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        if (fid_oid(&d->opd_last_used_fid) < 2 ||
            OBD_FAIL_CHECK(OBD_FAIL_OSP_GET_LAST_FID)) {
                /* lastfid looks strange... ask OST */
-               rc = osp_get_lastfid_from_ost(env, d);
+               rc = osp_get_lastfid_from_ost(env, d, true);
                if (rc)
                        GOTO(out, rc);
        }
@@ -955,6 +978,7 @@ out:
        if (req)
                ptlrpc_req_finished(req);
 
+
        /*
         * If rc is zero, the pre-creation window should have been emptied.
         * Since waking up the herd would be useless without pre-created
@@ -971,12 +995,13 @@ out:
                         * this OSP isn't quite functional yet */
                        osp_pre_update_status(d, rc);
                } else {
-                       wake_up(&d->opd_pre_user_waitq);
+                       wake_up_all(&d->opd_pre_user_waitq);
                }
        } else {
                spin_lock(&d->opd_pre_lock);
                d->opd_pre_recovering = 0;
                spin_unlock(&d->opd_pre_lock);
+               d->opd_cleanup_orphans_done = true;
        }
 
        RETURN(rc);
@@ -1364,6 +1389,19 @@ static int osp_precreate_thread(void *_args)
 static int osp_precreate_ready_condition(const struct lu_env *env,
                                         struct osp_device *d)
 {
+       /* Bail out I/O fails to OST */
+       if (d->opd_pre_status != 0 &&
+           d->opd_pre_status != -EAGAIN &&
+           d->opd_pre_status != -ENODEV &&
+           d->opd_pre_status != -ENOTCONN &&
+           d->opd_pre_status != -ENOSPC) {
+               /* DEBUG LU-3230 */
+               if (d->opd_pre_status != -EIO)
+                       CERROR("%s: precreate failed opd_pre_status %d\n",
+                              d->opd_obd->obd_name, d->opd_pre_status);
+               return 1;
+       }
+
        if (d->opd_pre_recovering)
                return 0;
 
@@ -1378,19 +1416,6 @@ static int osp_precreate_ready_condition(const struct lu_env *env,
            d->opd_pre_status == -ENOSPC)
                return 1;
 
-       /* Bail out I/O fails to OST */
-       if (d->opd_pre_status != 0 &&
-           d->opd_pre_status != -EAGAIN &&
-           d->opd_pre_status != -ENODEV &&
-           d->opd_pre_status != -ENOTCONN &&
-           d->opd_pre_status != -ENOSPC) {
-               /* DEBUG LU-3230 */
-               if (d->opd_pre_status != -EIO)
-                       CERROR("%s: precreate failed opd_pre_status %d\n",
-                              d->opd_obd->obd_name, d->opd_pre_status);
-               return 1;
-       }
-
        return 0;
 }
 
@@ -1513,6 +1538,8 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d,
                        break;
                }
 
+               CDEBUG(D_INFO, "%s: Sleeping on objects\n",
+                      d->opd_obd->obd_name);
                if (wait_event_idle_timeout(
                            d->opd_pre_user_waitq,
                            osp_precreate_ready_condition(env, d),
@@ -1527,6 +1554,9 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d,
                               atomic_read(&d->opd_sync_changes),
                               atomic_read(&d->opd_sync_rpcs_in_progress),
                               d->opd_pre_status);
+               } else {
+                       CDEBUG(D_INFO, "%s: Waked up, status=%d\n",
+                              d->opd_obd->obd_name, d->opd_pre_status);
                }
        }
 
@@ -1741,6 +1771,8 @@ int osp_init_precreate(struct osp_device *d)
        d->opd_pre_max_create_count = OST_MAX_PRECREATE;
        d->opd_reserved_mb_high = 0;
        d->opd_reserved_mb_low = 0;
+       d->opd_cleanup_orphans_done = false;
+       d->opd_force_creation = false;
 
        RETURN(0);
 }