Whamcloud - gitweb
LU-2279 osp: do not spin awaiting for connection
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index bc557e4..eafd24c 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -371,12 +371,36 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        struct ptlrpc_request   *req = NULL;
        struct obd_import       *imp;
        struct ost_body         *body;
+       struct l_wait_info       lwi = { 0 };
+       int                      update_status = 0;
        int                      rc;
 
        ENTRY;
 
-       LASSERT(d->opd_recovery_completed);
-       LASSERT(d->opd_pre_reserved == 0);
+       /*
+        * wait for local recovery to finish, so we can cleanup orphans
+        * orphans are all objects since "last used" (assigned), but
+        * there might be objects reserved and in some cases they won't
+        * be used. we can't cleanup them till we're sure they won't be
+        * used. also can't we allow new reservations because they may
+        * end up getting orphans being cleaned up below. so we block
+        * new reservations and wait till all reserved objects either
+        * user or released.
+        */
+       spin_lock(&d->opd_pre_lock);
+       d->opd_pre_recovering = 1;
+       spin_unlock(&d->opd_pre_lock);
+       /*
+        * The locking above makes sure the opd_pre_reserved check below will
+        * catch all osp_precreate_reserve() calls who find
+        * "!opd_pre_recovering".
+        */
+       l_wait_event(d->opd_pre_waitq,
+                    (!d->opd_pre_reserved && d->opd_recovery_completed) ||
+                    !osp_precreate_running(d) || d->opd_got_disconnected,
+                    &lwi);
+       if (!osp_precreate_running(d) || d->opd_got_disconnected)
+               GOTO(out, rc = -EAGAIN);
 
        CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
                d->opd_obd->obd_name, d->opd_last_used_id);
@@ -417,8 +441,10 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        req->rq_no_resend = req->rq_no_delay = 1;
 
        rc = ptlrpc_queue_wait(req);
-       if (rc)
+       if (rc) {
+               update_status = 1;
                GOTO(out, rc);
+       }
 
        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
@@ -437,6 +463,10 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
                d->opd_pre_grow_count = OST_MIN_PRECREATE;
                d->opd_pre_last_created = body->oa.o_id;
        }
+       /*
+        * This empties the pre-creation pool and effectively blocks any new
+        * reservations.
+        */
        d->opd_pre_used_id = d->opd_pre_last_created;
        d->opd_pre_grow_slow = 0;
        spin_unlock(&d->opd_pre_lock);
@@ -449,6 +479,28 @@ out:
        if (req)
                ptlrpc_req_finished(req);
 
+       d->opd_pre_recovering = 0;
+
+       /*
+        * If rc is zero, the pre-creation window should have been emptied.
+        * Since waking up the herd would be useless without pre-created
+        * objects, we defer the signal to osp_precreate_send() in that case.
+        */
+       if (rc != 0) {
+               if (update_status) {
+                       CERROR("%s: cannot cleanup orphans: rc = %d\n",
+                              d->opd_obd->obd_name, rc);
+                       /* we can't proceed from here, OST seem to
+                        * be in a bad shape, better to wait for
+                        * a new instance of the server and repeat
+                        * from the beginning. notify possible waiters
+                        * this OSP isn't quite functional yet */
+                       osp_pre_update_status(d, rc);
+               } else {
+                       cfs_waitq_signal(&d->opd_pre_user_waitq);
+               }
+       }
+
        RETURN(rc);
 }
 
@@ -558,37 +610,11 @@ static int osp_precreate_thread(void *_arg)
                osp_statfs_update(d);
 
                /*
-                * wait for local recovery to finish, so we can cleanup orphans
-                * orphans are all objects since "last used" (assigned), but
-                * there might be objects reserved and in some cases they won't
-                * be used. we can't cleanup them till we're sure they won't be
-                * used. so we block new reservations and wait till all reserved
-                * objects either user or released.
+                * Clean up orphans or recreate missing objects.
                 */
-               l_wait_event(d->opd_pre_waitq, (!d->opd_pre_reserved &&
-                                               d->opd_recovery_completed) ||
-                            !osp_precreate_running(d) ||
-                            d->opd_got_disconnected, &lwi);
-
-               if (osp_precreate_running(d) && !d->opd_got_disconnected) {
-                       rc = osp_precreate_cleanup_orphans(d);
-                       if (rc) {
-                               CERROR("%s: cannot cleanup orphans: rc = %d\n",
-                                      d->opd_obd->obd_name,  rc);
-                               /* we can't proceed from here, OST seem to
-                                * be in a bad shape, better to wait for
-                                * a new instance of the server and repeat
-                                * from the beginning. notify possible waiters
-                                * this OSP isn't quite functional yet */
-                               osp_pre_update_status(d, rc);
-                               cfs_waitq_signal(&d->opd_pre_user_waitq);
-                               l_wait_event(d->opd_pre_waitq,
-                                            !osp_precreate_running(d) ||
-                                            d->opd_new_connection, &lwi);
-                               continue;
-
-                       }
-               }
+               rc = osp_precreate_cleanup_orphans(d);
+               if (rc != 0)
+                       continue;
 
                /*
                 * connected, can handle precreates now
@@ -635,6 +661,9 @@ static int osp_precreate_ready_condition(struct osp_device *d)
 {
        __u64 next;
 
+       if (d->opd_pre_recovering)
+               return 0;
+
        /* ready if got enough precreated objects */
        /* we need to wait for others (opd_pre_reserved) and our object (+1) */
        next = d->opd_pre_used_id + d->opd_pre_reserved + 1;
@@ -643,7 +672,7 @@ static int osp_precreate_ready_condition(struct osp_device *d)
 
        /* ready if OST reported no space and no destoys in progress */
        if (d->opd_syn_changes + d->opd_syn_rpc_in_progress == 0 &&
-           d->opd_pre_status != 0)
+           d->opd_pre_status == -ENOSPC)
                return 1;
 
        return 0;
@@ -661,7 +690,7 @@ static int osp_precreate_timeout_condition(void *data)
                      d->opd_syn_changes, d->opd_syn_rpc_in_progress,
                      d->opd_pre_status);
 
-       return 0;
+       return 1;
 }
 
 /*
@@ -682,9 +711,6 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
 
        LASSERT(d->opd_pre_last_created >= d->opd_pre_used_id);
 
-       lwi = LWI_TIMEOUT(cfs_time_seconds(obd_timeout),
-                         osp_precreate_timeout_condition, d);
-
        /*
         * wait till:
         *  - preallocation is done
@@ -693,10 +719,6 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
         */
        while ((rc = d->opd_pre_status) == 0 || rc == -ENOSPC ||
                rc == -ENODEV) {
-               if (unlikely(rc == -ENODEV)) {
-                       if (cfs_time_aftereq(cfs_time_current(), expire))
-                               break;
-               }
 
 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
                /*
@@ -724,7 +746,8 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
 
                spin_lock(&d->opd_pre_lock);
                precreated = d->opd_pre_last_created - d->opd_pre_used_id;
-               if (precreated > d->opd_pre_reserved) {
+               if (precreated > d->opd_pre_reserved &&
+                   !d->opd_pre_recovering) {
                        d->opd_pre_reserved++;
                        spin_unlock(&d->opd_pre_lock);
                        rc = 0;
@@ -762,6 +785,11 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                /* XXX: don't wake up if precreation is in progress */
                cfs_waitq_signal(&d->opd_pre_waitq);
 
+               lwi = LWI_TIMEOUT(expire - cfs_time_current(),
+                               osp_precreate_timeout_condition, d);
+               if (cfs_time_aftereq(cfs_time_current(), expire))
+                       break;
+
                l_wait_event(d->opd_pre_user_waitq,
                             osp_precreate_ready_condition(d), &lwi);
        }