Whamcloud - gitweb
LU-1952 tests: fix sanity test_51b on small MDTs
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index 49ea453..53cfef9 100644 (file)
@@ -312,6 +312,56 @@ out_req:
        RETURN(rc);
 }
 
+
+static int osp_get_lastid_from_ost(struct osp_device *d)
+{
+       struct ptlrpc_request   *req;
+       struct obd_import       *imp;
+       obd_id                  *reply;
+       char                    *tmp;
+       int                      rc;
+
+       imp = d->opd_obd->u.cli.cl_import;
+       LASSERT(imp);
+
+       req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_ID);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
+                            RCL_CLIENT, sizeof(KEY_LAST_ID));
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+       memcpy(tmp, KEY_LAST_ID, sizeof(KEY_LAST_ID));
+
+       req->rq_no_delay = req->rq_no_resend = 1;
+       ptlrpc_request_set_replen(req);
+       rc = ptlrpc_queue_wait(req);
+       if (rc) {
+               /* bad-bad OST.. let sysadm sort this out */
+               ptlrpc_set_import_active(imp, 0);
+               GOTO(out, rc);
+       }
+
+       reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
+       if (reply == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       d->opd_last_used_id = *reply;
+       CDEBUG(D_HA, "%s: got last_id "LPU64" from OST\n",
+              d->opd_obd->obd_name, d->opd_last_used_id);
+
+out:
+       ptlrpc_req_finished(req);
+       RETURN(rc);
+
+}
+
 /**
  * asks OST to clean precreate orphans
  * and gets next id for new objects
@@ -328,22 +378,32 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        LASSERT(d->opd_recovery_completed);
        LASSERT(d->opd_pre_reserved == 0);
 
+       CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
+               d->opd_obd->obd_name, d->opd_last_used_id);
+
+       if (d->opd_last_used_id < 2) {
+               /* lastid looks strange... ask OST */
+               rc = osp_get_lastid_from_ost(d);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
        imp = d->opd_obd->u.cli.cl_import;
        LASSERT(imp);
 
        req = ptlrpc_request_alloc(imp, &RQF_OST_CREATE);
        if (req == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out, rc = -ENOMEM);
 
        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
        if (rc) {
                ptlrpc_request_free(req);
-               RETURN(rc);
+               GOTO(out, rc);
        }
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
-               GOTO(out_req, rc = -EPROTO);
+               GOTO(out, rc = -EPROTO);
 
        body->oa.o_flags = OBD_FL_DELORPHAN;
        body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
@@ -359,11 +419,11 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
 
        rc = ptlrpc_queue_wait(req);
        if (rc)
-               GOTO(out_req, rc);
+               GOTO(out, rc);
 
        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
-               GOTO(out_req, rc = -EPROTO);
+               GOTO(out, rc = -EPROTO);
 
        /*
         * OST provides us with id new pool starts from in body->oa.o_id
@@ -382,16 +442,14 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        d->opd_pre_grow_slow = 0;
        cfs_spin_unlock(&d->opd_pre_lock);
 
-       /* now we can wakeup all users awaiting for objects */
-       osp_pre_update_status(d, rc);
-       cfs_waitq_signal(&d->opd_pre_user_waitq);
-
        CDEBUG(D_HA, "Got last_id "LPU64" from OST, last_used is "LPU64
               ", next "LPU64"\n", body->oa.o_id,
               le64_to_cpu(d->opd_last_used_id), d->opd_pre_next);
 
-out_req:
-       ptlrpc_req_finished(req);
+out:
+       if (req)
+               ptlrpc_req_finished(req);
+
        RETURN(rc);
 }
 
@@ -425,6 +483,9 @@ void osp_pre_update_status(struct osp_device *d, int rc)
                                       d->opd_obd->obd_name, msfs->os_blocks,
                                       msfs->os_bfree, used, msfs->os_bavail,
                                       d->opd_pre_status, rc);
+                       CDEBUG(D_INFO,
+                              "non-commited changes: %lu, in progress: %u\n",
+                              d->opd_syn_changes, d->opd_syn_rpc_in_progress);
                } else if (old == -ENOSPC) {
                        d->opd_pre_status = 0;
                        d->opd_pre_grow_slow = 0;
@@ -501,6 +562,18 @@ static int osp_precreate_thread(void *_arg)
                        if (rc) {
                                CERROR("%s: cannot cleanup orphans: rc = %d\n",
                                       d->opd_obd->obd_name,  rc);
+                               /* we can't proceed from here, OST seem to
+                                * be in a bad shape, better to wait for
+                                * a new instance of the server and repeat
+                                * from the beginning. notify possible waiters
+                                * this OSP isn't quite functional yet */
+                               osp_pre_update_status(d, rc);
+                               cfs_waitq_signal(&d->opd_pre_user_waitq);
+                               l_wait_event(d->opd_pre_waitq,
+                                            !osp_precreate_running(d) ||
+                                            d->opd_new_connection, &lwi);
+                               continue;
+
                        }
                }
 
@@ -551,8 +624,9 @@ static int osp_precreate_ready_condition(struct osp_device *d)
        if (d->opd_pre_next + d->opd_pre_reserved < d->opd_pre_last_created)
                return 1;
 
-       /* ready if OST reported no space */
-       if (d->opd_pre_status != 0)
+       /* ready if OST reported no space and no destoys in progress */
+       if (d->opd_syn_changes + d->opd_syn_rpc_in_progress == 0 &&
+           d->opd_pre_status != 0)
                return 1;
 
        return 0;
@@ -563,9 +637,11 @@ static int osp_precreate_timeout_condition(void *data)
        struct osp_device *d = data;
 
        LCONSOLE_WARN("%s: slow creates, last="LPU64", next="LPU64", "
-                     "reserved="LPU64", status=%d\n",
+                     "reserved="LPU64", syn_changes=%lu, "
+                     "syn_rpc_in_progress=%d, status=%d\n",
                      d->opd_obd->obd_name, d->opd_pre_last_created,
                      d->opd_pre_next, d->opd_pre_reserved,
+                     d->opd_syn_changes, d->opd_syn_rpc_in_progress,
                      d->opd_pre_status);
 
        return 0;
@@ -583,6 +659,7 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
        struct l_wait_info       lwi;
        cfs_time_t               expire = cfs_time_shift(obd_timeout);
        int                      precreated, rc;
+       int                      count = 0;
 
        ENTRY;
 
@@ -604,6 +681,15 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                                break;
                }
 
+#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0)
+#error "remove this before the release"
+#endif
+               /*
+                * to address Andreas's concern on possible busy-loop
+                * between this thread and osp_precreate_send()
+                */
+               LASSERT(count++ < 1000);
+
                /*
                 * increase number of precreations
                 */
@@ -635,6 +721,28 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                }
                cfs_spin_unlock(&d->opd_pre_lock);
 
+               /*
+                * all precreated objects have been used and no-space
+                * status leave us no chance to succeed very soon
+                * but if there is destroy in progress, then we should
+                * wait till that is done - some space might be released
+                */
+               if (unlikely(rc == -ENOSPC)) {
+                       if (d->opd_syn_changes) {
+                               /* force local commit to release space */
+                               dt_commit_async(env, d->opd_storage);
+                       }
+                       if (d->opd_syn_rpc_in_progress) {
+                               /* just wait till destroys are done */
+                               /* see l_wait_even() few lines below */
+                       }
+                       if (d->opd_syn_changes +
+                           d->opd_syn_rpc_in_progress == 0) {
+                               /* no hope for free space */
+                               break;
+                       }
+               }
+
                /* XXX: don't wake up if precreation is in progress */
                cfs_waitq_signal(&d->opd_pre_waitq);