Whamcloud - gitweb
LU-2285 osp: Block new reservations during orphan recoveries
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index 53cfef9..81b5cf0 100644 (file)
@@ -112,7 +112,7 @@ out:
        /* couldn't update statfs, try again as soon as possible */
        cfs_waitq_signal(&d->opd_pre_waitq);
        if (req->rq_import_generation == imp->imp_generation)
-               CERROR("%s: couldn't update statfs: rc = %d\n",
+               CDEBUG(D_CACHE, "%s: couldn't update statfs: rc = %d\n",
                       d->opd_obd->obd_name, rc);
        RETURN(rc);
 }
@@ -148,16 +148,15 @@ static int osp_statfs_update(struct osp_device *d)
        aa = ptlrpc_req_async_args(req);
        aa->pointer_arg[0] = d;
 
-       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
-
-       cfs_timer_disarm(&d->opd_statfs_timer);
-
        /*
         * no updates till reply
         */
+       cfs_timer_disarm(&d->opd_statfs_timer);
        d->opd_statfs_fresh_till = cfs_time_shift(obd_timeout * 1000);
        d->opd_statfs_update_in_progress = 1;
 
+       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+
        RETURN(0);
 }
 
@@ -200,7 +199,7 @@ static inline int osp_precreate_stopped(struct osp_device *d)
 
 static inline int osp_precreate_near_empty_nolock(struct osp_device *d)
 {
-       int window = d->opd_pre_last_created - d->opd_pre_next;
+       int window = d->opd_pre_last_created - d->opd_pre_used_id;
 
        /* don't consider new precreation till OST is healty and
         * has free space */
@@ -213,9 +212,9 @@ static inline int osp_precreate_near_empty(struct osp_device *d)
        int rc;
 
        /* XXX: do we really need locking here? */
-       cfs_spin_lock(&d->opd_pre_lock);
+       spin_lock(&d->opd_pre_lock);
        rc = osp_precreate_near_empty_nolock(d);
-       cfs_spin_unlock(&d->opd_pre_lock);
+       spin_unlock(&d->opd_pre_lock);
        return rc;
 }
 
@@ -255,11 +254,11 @@ static int osp_precreate_send(struct osp_device *d)
                RETURN(rc);
        }
 
-       cfs_spin_lock(&d->opd_pre_lock);
+       spin_lock(&d->opd_pre_lock);
        if (d->opd_pre_grow_count > d->opd_pre_max_grow_count / 2)
                d->opd_pre_grow_count = d->opd_pre_max_grow_count / 2;
        grow = d->opd_pre_grow_count;
-       cfs_spin_unlock(&d->opd_pre_lock);
+       spin_unlock(&d->opd_pre_lock);
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        LASSERT(body);
@@ -281,12 +280,13 @@ static int osp_precreate_send(struct osp_device *d)
        if (body == NULL)
                GOTO(out_req, rc = -EPROTO);
 
-       CDEBUG(D_HA, "new last_created %lu\n", (unsigned long) body->oa.o_id);
-       LASSERT(body->oa.o_id > d->opd_pre_next);
+       CDEBUG(D_HA, "%s: new last_created "LPU64"\n", d->opd_obd->obd_name,
+              body->oa.o_id);
+       LASSERT(body->oa.o_id > d->opd_pre_used_id);
 
        diff = body->oa.o_id - d->opd_pre_last_created;
 
-       cfs_spin_lock(&d->opd_pre_lock);
+       spin_lock(&d->opd_pre_lock);
        if (diff < grow) {
                /* the OST has not managed to create all the
                 * objects we asked for */
@@ -299,9 +299,9 @@ static int osp_precreate_send(struct osp_device *d)
                d->opd_pre_grow_slow = 0;
        }
        d->opd_pre_last_created = body->oa.o_id;
-       cfs_spin_unlock(&d->opd_pre_lock);
+       spin_unlock(&d->opd_pre_lock);
        CDEBUG(D_OTHER, "current precreated pool: %llu-%llu\n",
-              d->opd_pre_next, d->opd_pre_last_created);
+              d->opd_pre_used_id, d->opd_pre_last_created);
 
 out_req:
        /* now we can wakeup all users awaiting for objects */
@@ -371,12 +371,36 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        struct ptlrpc_request   *req = NULL;
        struct obd_import       *imp;
        struct ost_body         *body;
+       struct l_wait_info       lwi = { 0 };
+       int                      update_status = 0;
        int                      rc;
 
        ENTRY;
 
-       LASSERT(d->opd_recovery_completed);
-       LASSERT(d->opd_pre_reserved == 0);
+       /*
+        * wait for local recovery to finish, so we can cleanup orphans
+        * orphans are all objects since "last used" (assigned), but
+        * there might be objects reserved and in some cases they won't
+        * be used. we can't cleanup them till we're sure they won't be
+        * used. also can't we allow new reservations because they may
+        * end up getting orphans being cleaned up below. so we block
+        * new reservations and wait till all reserved objects either
+        * user or released.
+        */
+       spin_lock(&d->opd_pre_lock);
+       d->opd_pre_recovering = 1;
+       spin_unlock(&d->opd_pre_lock);
+       /*
+        * The locking above makes sure the opd_pre_reserved check below will
+        * catch all osp_precreate_reserve() calls who find
+        * "!opd_pre_recovering".
+        */
+       l_wait_event(d->opd_pre_waitq,
+                    (!d->opd_pre_reserved && d->opd_recovery_completed) ||
+                    !osp_precreate_running(d) || d->opd_got_disconnected,
+                    &lwi);
+       if (!osp_precreate_running(d) || d->opd_got_disconnected)
+               GOTO(out, rc = -EAGAIN);
 
        CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
                d->opd_obd->obd_name, d->opd_last_used_id);
@@ -409,8 +433,7 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
        body->oa.o_seq = FID_SEQ_OST_MDT0;
 
-       /* remove from NEXT after used one */
-       body->oa.o_id = d->opd_last_used_id + 1;
+       body->oa.o_id = d->opd_last_used_id;
 
        ptlrpc_request_set_replen(req);
 
@@ -418,8 +441,10 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        req->rq_no_resend = req->rq_no_delay = 1;
 
        rc = ptlrpc_queue_wait(req);
-       if (rc)
+       if (rc) {
+               update_status = 1;
                GOTO(out, rc);
+       }
 
        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
@@ -428,28 +453,54 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        /*
         * OST provides us with id new pool starts from in body->oa.o_id
         */
-       cfs_spin_lock(&d->opd_pre_lock);
+       spin_lock(&d->opd_pre_lock);
        if (le64_to_cpu(d->opd_last_used_id) > body->oa.o_id) {
                d->opd_pre_grow_count = OST_MIN_PRECREATE +
                                        le64_to_cpu(d->opd_last_used_id) -
                                        body->oa.o_id;
-               d->opd_pre_last_created = le64_to_cpu(d->opd_last_used_id) + 1;
+               d->opd_pre_last_created = le64_to_cpu(d->opd_last_used_id);
        } else {
                d->opd_pre_grow_count = OST_MIN_PRECREATE;
-               d->opd_pre_last_created = body->oa.o_id + 1;
+               d->opd_pre_last_created = body->oa.o_id;
        }
-       d->opd_pre_next = d->opd_pre_last_created;
+       /*
+        * This empties the pre-creation pool and effectively blocks any new
+        * reservations.
+        */
+       d->opd_pre_used_id = d->opd_pre_last_created;
        d->opd_pre_grow_slow = 0;
-       cfs_spin_unlock(&d->opd_pre_lock);
+       spin_unlock(&d->opd_pre_lock);
 
-       CDEBUG(D_HA, "Got last_id "LPU64" from OST, last_used is "LPU64
-              ", next "LPU64"\n", body->oa.o_id,
-              le64_to_cpu(d->opd_last_used_id), d->opd_pre_next);
+       CDEBUG(D_HA, "%s: Got last_id "LPU64" from OST, last_used is "LPU64
+              ", pre_used "LPU64"\n", d->opd_obd->obd_name, body->oa.o_id,
+              le64_to_cpu(d->opd_last_used_id), d->opd_pre_used_id);
 
 out:
        if (req)
                ptlrpc_req_finished(req);
 
+       d->opd_pre_recovering = 0;
+
+       /*
+        * If rc is zero, the pre-creation window should have been emptied.
+        * Since waking up the herd would be useless without pre-created
+        * objects, we defer the signal to osp_precreate_send() in that case.
+        */
+       if (rc != 0) {
+               if (update_status) {
+                       CERROR("%s: cannot cleanup orphans: rc = %d\n",
+                              d->opd_obd->obd_name, rc);
+                       /* we can't proceed from here, OST seem to
+                        * be in a bad shape, better to wait for
+                        * a new instance of the server and repeat
+                        * from the beginning. notify possible waiters
+                        * this OSP isn't quite functional yet */
+                       osp_pre_update_status(d, rc);
+               } else {
+                       cfs_waitq_signal(&d->opd_pre_user_waitq);
+               }
+       }
+
        RETURN(rc);
 }
 
@@ -471,6 +522,20 @@ void osp_pre_update_status(struct osp_device *d, int rc)
        if (rc)
                goto out;
 
+       /* Add a bit of hysteresis so this flag isn't continually flapping,
+        * and ensure that new files don't get extremely fragmented due to
+        * only a small amount of available space in the filesystem.
+        * We want to set the NOSPC flag when there is less than ~0.1% free
+        * and clear it when there is at least ~0.2% free space, so:
+        *                   avail < ~0.1% max          max = avail + used
+        *            1025 * avail < avail + used       used = blocks - free
+        *            1024 * avail < used
+        *            1024 * avail < blocks - free
+        *                   avail < ((blocks - free) >> 10)
+        *
+        * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
+        * lose that amount of space so in those cases we report no space left
+        * if their is less than 1 GB left.                             */
        if (likely(msfs->os_type)) {
                used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10,
                                    1 << 30);
@@ -516,9 +581,9 @@ static int osp_precreate_thread(void *_arg)
        sprintf(pname, "osp-pre-%u\n", d->opd_index);
        cfs_daemonize(pname);
 
-       cfs_spin_lock(&d->opd_pre_lock);
+       spin_lock(&d->opd_pre_lock);
        thread->t_flags = SVC_RUNNING;
-       cfs_spin_unlock(&d->opd_pre_lock);
+       spin_unlock(&d->opd_pre_lock);
        cfs_waitq_signal(&thread->t_ctl_waitq);
 
        while (osp_precreate_running(d)) {
@@ -545,37 +610,11 @@ static int osp_precreate_thread(void *_arg)
                osp_statfs_update(d);
 
                /*
-                * wait for local recovery to finish, so we can cleanup orphans
-                * orphans are all objects since "last used" (assigned), but
-                * there might be objects reserved and in some cases they won't
-                * be used. we can't cleanup them till we're sure they won't be
-                * used. so we block new reservations and wait till all reserved
-                * objects either user or released.
+                * Clean up orphans or recreate missing objects.
                 */
-               l_wait_event(d->opd_pre_waitq, (!d->opd_pre_reserved &&
-                                               d->opd_recovery_completed) ||
-                            !osp_precreate_running(d) ||
-                            d->opd_got_disconnected, &lwi);
-
-               if (osp_precreate_running(d) && !d->opd_got_disconnected) {
-                       rc = osp_precreate_cleanup_orphans(d);
-                       if (rc) {
-                               CERROR("%s: cannot cleanup orphans: rc = %d\n",
-                                      d->opd_obd->obd_name,  rc);
-                               /* we can't proceed from here, OST seem to
-                                * be in a bad shape, better to wait for
-                                * a new instance of the server and repeat
-                                * from the beginning. notify possible waiters
-                                * this OSP isn't quite functional yet */
-                               osp_pre_update_status(d, rc);
-                               cfs_waitq_signal(&d->opd_pre_user_waitq);
-                               l_wait_event(d->opd_pre_waitq,
-                                            !osp_precreate_running(d) ||
-                                            d->opd_new_connection, &lwi);
-                               continue;
-
-                       }
-               }
+               rc = osp_precreate_cleanup_orphans(d);
+               if (rc != 0)
+                       continue;
 
                /*
                 * connected, can handle precreates now
@@ -620,8 +659,15 @@ static int osp_precreate_thread(void *_arg)
 
 static int osp_precreate_ready_condition(struct osp_device *d)
 {
+       __u64 next;
+
+       if (d->opd_pre_recovering)
+               return 0;
+
        /* ready if got enough precreated objects */
-       if (d->opd_pre_next + d->opd_pre_reserved < d->opd_pre_last_created)
+       /* we need to wait for others (opd_pre_reserved) and our object (+1) */
+       next = d->opd_pre_used_id + d->opd_pre_reserved + 1;
+       if (next <= d->opd_pre_last_created)
                return 1;
 
        /* ready if OST reported no space and no destoys in progress */
@@ -640,7 +686,7 @@ static int osp_precreate_timeout_condition(void *data)
                      "reserved="LPU64", syn_changes=%lu, "
                      "syn_rpc_in_progress=%d, status=%d\n",
                      d->opd_obd->obd_name, d->opd_pre_last_created,
-                     d->opd_pre_next, d->opd_pre_reserved,
+                     d->opd_pre_used_id, d->opd_pre_reserved,
                      d->opd_syn_changes, d->opd_syn_rpc_in_progress,
                      d->opd_pre_status);
 
@@ -663,7 +709,7 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
 
        ENTRY;
 
-       LASSERT(d->opd_pre_last_created >= d->opd_pre_next);
+       LASSERT(d->opd_pre_last_created >= d->opd_pre_used_id);
 
        lwi = LWI_TIMEOUT(cfs_time_seconds(obd_timeout),
                          osp_precreate_timeout_condition, d);
@@ -681,36 +727,36 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                                break;
                }
 
-#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0)
-#error "remove this before the release"
-#endif
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
                /*
                 * to address Andreas's concern on possible busy-loop
                 * between this thread and osp_precreate_send()
                 */
-               LASSERT(count++ < 1000);
+               if (unlikely(count++ == 1000)) {
+                       osp_precreate_timeout_condition(d);
+                       LBUG();
+               }
+#endif
 
                /*
                 * increase number of precreations
                 */
                if (d->opd_pre_grow_count < d->opd_pre_max_grow_count &&
                    d->opd_pre_grow_slow == 0 &&
-                   (d->opd_pre_last_created - d->opd_pre_next <=
+                   (d->opd_pre_last_created - d->opd_pre_used_id <=
                     d->opd_pre_grow_count / 4 + 1)) {
-                       cfs_spin_lock(&d->opd_pre_lock);
+                       spin_lock(&d->opd_pre_lock);
                        d->opd_pre_grow_slow = 1;
                        d->opd_pre_grow_count *= 2;
-                       cfs_spin_unlock(&d->opd_pre_lock);
+                       spin_unlock(&d->opd_pre_lock);
                }
 
-               /*
-                * we never use the last object in the window
-                */
-               cfs_spin_lock(&d->opd_pre_lock);
-               precreated = d->opd_pre_last_created - d->opd_pre_next;
-               if (precreated > d->opd_pre_reserved) {
+               spin_lock(&d->opd_pre_lock);
+               precreated = d->opd_pre_last_created - d->opd_pre_used_id;
+               if (precreated > d->opd_pre_reserved &&
+                   !d->opd_pre_recovering) {
                        d->opd_pre_reserved++;
-                       cfs_spin_unlock(&d->opd_pre_lock);
+                       spin_unlock(&d->opd_pre_lock);
                        rc = 0;
 
                        /* XXX: don't wake up if precreation is in progress */
@@ -719,7 +765,7 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
 
                        break;
                }
-               cfs_spin_unlock(&d->opd_pre_lock);
+               spin_unlock(&d->opd_pre_lock);
 
                /*
                 * all precreated objects have been used and no-space
@@ -761,16 +807,16 @@ __u64 osp_precreate_get_id(struct osp_device *d)
        obd_id objid;
 
        /* grab next id from the pool */
-       cfs_spin_lock(&d->opd_pre_lock);
-       LASSERT(d->opd_pre_next <= d->opd_pre_last_created);
-       objid = d->opd_pre_next++;
+       spin_lock(&d->opd_pre_lock);
+       LASSERT(d->opd_pre_used_id < d->opd_pre_last_created);
+       objid = ++d->opd_pre_used_id;
        d->opd_pre_reserved--;
        /*
         * last_used_id must be changed along with getting new id otherwise
         * we might miscalculate gap causing object loss or leak
         */
        osp_update_last_id(d, objid);
-       cfs_spin_unlock(&d->opd_pre_lock);
+       spin_unlock(&d->opd_pre_lock);
 
        /*
         * probably main thread suspended orphan cleanup till
@@ -862,8 +908,8 @@ int osp_init_precreate(struct osp_device *d)
 
        /* initially precreation isn't ready */
        d->opd_pre_status = -EAGAIN;
-       d->opd_pre_next = 1;
-       d->opd_pre_last_created = 1;
+       d->opd_pre_used_id = 0;
+       d->opd_pre_last_created = 0;
        d->opd_pre_reserved = 0;
        d->opd_got_disconnected = 1;
        d->opd_pre_grow_slow = 0;
@@ -871,7 +917,7 @@ int osp_init_precreate(struct osp_device *d)
        d->opd_pre_min_grow_count = OST_MIN_PRECREATE;
        d->opd_pre_max_grow_count = OST_MAX_PRECREATE;
 
-       cfs_spin_lock_init(&d->opd_pre_lock);
+       spin_lock_init(&d->opd_pre_lock);
        cfs_waitq_init(&d->opd_pre_waitq);
        cfs_waitq_init(&d->opd_pre_user_waitq);
        cfs_waitq_init(&d->opd_pre_thread.t_ctl_waitq);