Whamcloud - gitweb
LU-8972 osp: skip subsequent orphan cleanups
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index 435d618..ae7e4ef 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -141,7 +141,7 @@ static int osp_statfs_interpret(const struct lu_env *env,
 
        /* schedule next update */
        d->opd_statfs_fresh_till = cfs_time_shift(d->opd_statfs_maxage);
-       cfs_timer_arm(&d->opd_statfs_timer, d->opd_statfs_fresh_till);
+       mod_timer(&d->opd_statfs_timer, d->opd_statfs_fresh_till);
        d->opd_statfs_update_in_progress = 0;
 
        CDEBUG(D_CACHE, "updated statfs %p\n", d);
@@ -202,7 +202,7 @@ static int osp_statfs_update(struct osp_device *d)
        /*
         * no updates till reply
         */
-       cfs_timer_disarm(&d->opd_statfs_timer);
+       del_timer(&d->opd_statfs_timer);
        d->opd_statfs_fresh_till = cfs_time_shift(obd_timeout * 1000);
        d->opd_statfs_update_in_progress = 1;
 
@@ -233,7 +233,7 @@ void osp_statfs_need_now(struct osp_device *d)
                 * is replied
                 */
                d->opd_statfs_fresh_till = cfs_time_shift(-1);
-               cfs_timer_disarm(&d->opd_statfs_timer);
+               del_timer(&d->opd_statfs_timer);
                wake_up(&d->opd_pre_waitq);
        }
 }
@@ -573,6 +573,7 @@ static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
                RETURN(rc);
        }
 
+       LASSERT(d->opd_pre->osp_pre_delorphan_sent != 0);
        spin_lock(&d->opd_pre_lock);
        if (d->opd_pre_create_count > d->opd_pre_max_create_count / 2)
                d->opd_pre_create_count = d->opd_pre_max_create_count / 2;
@@ -778,18 +779,20 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        int                      update_status = 0;
        int                      rc;
        int                      diff;
+       struct lu_fid            fid;
 
        ENTRY;
 
        /*
-        * wait for local recovery to finish, so we can cleanup orphans
-        * orphans are all objects since "last used" (assigned), but
-        * there might be objects reserved and in some cases they won't
-        * be used. we can't cleanup them till we're sure they won't be
-        * used. also can't we allow new reservations because they may
-        * end up getting orphans being cleaned up below. so we block
-        * new reservations and wait till all reserved objects either
-        * user or released.
+        * wait for local recovery to finish, so we can cleanup orphans.
+        * orphans are all objects since "last used" (assigned).
+        * consider reserved objects as created otherwise we can get into
+        * a livelock when one blocked thread holding a reservation can
+        * block recovery. see LU-8367 for the details. in some cases this
+        * can result in gaps (i.e. leaked objects), but we've got LFSCK...
+        *
+        * do not allow new reservations because they may end up getting
+        * orphans being cleaned up below. so we block new reservations.
         */
        spin_lock(&d->opd_pre_lock);
        d->opd_pre_recovering = 1;
@@ -799,16 +802,12 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
         * catch all osp_precreate_reserve() calls who find
         * "!opd_pre_recovering".
         */
-       l_wait_event(d->opd_pre_waitq,
-                    (!d->opd_pre_reserved && d->opd_recovery_completed) ||
+       l_wait_event(d->opd_pre_waitq, d->opd_recovery_completed ||
                     !osp_precreate_running(d) || d->opd_got_disconnected,
                     &lwi);
        if (!osp_precreate_running(d) || d->opd_got_disconnected)
                GOTO(out, rc = -EAGAIN);
 
-       CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
-              d->opd_obd->obd_name, PFID(&d->opd_last_used_fid));
-
        *last_fid = d->opd_last_used_fid;
        /* The OSP should already get the valid seq now */
        LASSERT(!fid_is_zero(last_fid));
@@ -837,10 +836,24 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        if (body == NULL)
                GOTO(out, rc = -EPROTO);
 
-       body->oa.o_flags = OBD_FL_DELORPHAN;
+       body->oa.o_flags = 0;
        body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
 
-       fid_to_ostid(&d->opd_last_used_fid, &body->oa.o_oi);
+       /* unless this is the very first DELORPHAN (when we really
+        * can destroy some orphans), just tell OST to recreate
+        * missing objects in our precreate pool */
+       spin_lock(&d->opd_pre_lock);
+       if (d->opd_pre->osp_pre_delorphan_sent) {
+               fid = d->opd_pre_last_created_fid;
+       } else {
+               fid = d->opd_last_used_fid;
+               body->oa.o_flags = OBD_FL_DELORPHAN;
+       }
+       spin_unlock(&d->opd_pre_lock);
+       fid_to_ostid(&fid, &body->oa.o_oi);
+
+       CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
+              d->opd_obd->obd_name, PFID(&fid));
 
        ptlrpc_request_set_replen(req);
 
@@ -863,10 +876,10 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
        ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
 
        spin_lock(&d->opd_pre_lock);
-       diff = osp_fid_diff(&d->opd_last_used_fid, last_fid);
+       diff = osp_fid_diff(&fid, last_fid);
        if (diff > 0) {
                d->opd_pre_create_count = OST_MIN_PRECREATE + diff;
-               d->opd_pre_last_created_fid = d->opd_last_used_fid;
+               d->opd_pre_last_created_fid = *last_fid;
        } else {
                d->opd_pre_create_count = OST_MIN_PRECREATE;
                d->opd_pre_last_created_fid = *last_fid;
@@ -877,9 +890,11 @@ static int osp_precreate_cleanup_orphans(struct lu_env *env,
         */
        LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
                LUSTRE_DATA_SEQ_MAX_WIDTH);
-       d->opd_pre_used_fid = d->opd_pre_last_created_fid;
+       if (d->opd_pre->osp_pre_delorphan_sent == 0)
+               d->opd_pre_used_fid = d->opd_pre_last_created_fid;
        d->opd_pre_create_slow = 0;
        spin_unlock(&d->opd_pre_lock);
+       d->opd_pre->osp_pre_delorphan_sent = 1;
 
        CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
               "last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
@@ -888,10 +903,6 @@ out:
        if (req)
                ptlrpc_req_finished(req);
 
-       spin_lock(&d->opd_pre_lock);
-       d->opd_pre_recovering = 0;
-       spin_unlock(&d->opd_pre_lock);
-
        /*
         * If rc is zero, the pre-creation window should have been emptied.
         * Since waking up the herd would be useless without pre-created
@@ -910,6 +921,10 @@ out:
                } else {
                        wake_up(&d->opd_pre_user_waitq);
                }
+       } else {
+               spin_lock(&d->opd_pre_lock);
+               d->opd_pre_recovering = 0;
+               spin_unlock(&d->opd_pre_lock);
        }
 
        RETURN(rc);
@@ -1148,6 +1163,10 @@ static int osp_precreate_thread(void *_arg)
                 * need to be connected to OST
                 */
                while (osp_precreate_running(d)) {
+                       if (d->opd_pre_recovering &&
+                           d->opd_imp_connected &&
+                           !d->opd_got_disconnected)
+                               break;
                        l_wait_event(d->opd_pre_waitq,
                                     !osp_precreate_running(d) ||
                                     d->opd_new_connection,
@@ -1189,8 +1208,10 @@ static int osp_precreate_thread(void *_arg)
                 * Clean up orphans or recreate missing objects.
                 */
                rc = osp_precreate_cleanup_orphans(&env, d);
-               if (rc != 0)
+               if (rc != 0) {
+                       schedule_timeout_interruptible(cfs_time_seconds(1));
                        continue;
+               }
                /*
                 * connected, can handle precreates now
                 */
@@ -1355,6 +1376,12 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
        if (d->opd_pre_max_create_count == 0)
                RETURN(-ENOBUFS);
 
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_OSP_PRECREATE_WAIT)) {
+               if (d->opd_index == cfs_fail_val)
+                       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OSP_PRECREATE_WAIT,
+                                        obd_timeout);
+       }
+
        /*
         * wait till:
         *  - preallocation is done
@@ -1643,7 +1670,8 @@ int osp_init_precreate(struct osp_device *d)
        CDEBUG(D_OTHER, "current %llu, fresh till %llu\n",
               (unsigned long long)cfs_time_current(),
               (unsigned long long)d->opd_statfs_fresh_till);
-       cfs_timer_init(&d->opd_statfs_timer, osp_statfs_timer_cb, d);
+       setup_timer(&d->opd_statfs_timer, osp_statfs_timer_cb,
+                   (unsigned long)d);
 
        /*
         * start thread handling precreation and statfs updates
@@ -1677,7 +1705,7 @@ void osp_precreate_fini(struct osp_device *d)
 
        ENTRY;
 
-       cfs_timer_disarm(&d->opd_statfs_timer);
+       del_timer(&d->opd_statfs_timer);
 
        if (d->opd_pre == NULL)
                RETURN_EXIT;