Whamcloud - gitweb
LU-2154 osp: precreate logic to use last assigned id
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index dfcb2a2..7f85a20 100644 (file)
@@ -148,16 +148,15 @@ static int osp_statfs_update(struct osp_device *d)
        aa = ptlrpc_req_async_args(req);
        aa->pointer_arg[0] = d;
 
-       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
-
-       cfs_timer_disarm(&d->opd_statfs_timer);
-
        /*
         * no updates till reply
         */
+       cfs_timer_disarm(&d->opd_statfs_timer);
        d->opd_statfs_fresh_till = cfs_time_shift(obd_timeout * 1000);
        d->opd_statfs_update_in_progress = 1;
 
+       ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+
        RETURN(0);
 }
 
@@ -200,7 +199,7 @@ static inline int osp_precreate_stopped(struct osp_device *d)
 
 static inline int osp_precreate_near_empty_nolock(struct osp_device *d)
 {
-       int window = d->opd_pre_last_created - d->opd_pre_next;
+       int window = d->opd_pre_last_created - d->opd_pre_used_id;
 
        /* don't consider new precreation till OST is healty and
         * has free space */
@@ -282,7 +281,7 @@ static int osp_precreate_send(struct osp_device *d)
                GOTO(out_req, rc = -EPROTO);
 
        CDEBUG(D_HA, "new last_created %lu\n", (unsigned long) body->oa.o_id);
-       LASSERT(body->oa.o_id > d->opd_pre_next);
+       LASSERT(body->oa.o_id > d->opd_pre_used_id);
 
        diff = body->oa.o_id - d->opd_pre_last_created;
 
@@ -301,7 +300,7 @@ static int osp_precreate_send(struct osp_device *d)
        d->opd_pre_last_created = body->oa.o_id;
        cfs_spin_unlock(&d->opd_pre_lock);
        CDEBUG(D_OTHER, "current precreated pool: %llu-%llu\n",
-              d->opd_pre_next, d->opd_pre_last_created);
+              d->opd_pre_used_id, d->opd_pre_last_created);
 
 out_req:
        /* now we can wakeup all users awaiting for objects */
@@ -312,6 +311,56 @@ out_req:
        RETURN(rc);
 }
 
+
+static int osp_get_lastid_from_ost(struct osp_device *d)
+{
+       struct ptlrpc_request   *req;
+       struct obd_import       *imp;
+       obd_id                  *reply;
+       char                    *tmp;
+       int                      rc;
+
+       imp = d->opd_obd->u.cli.cl_import;
+       LASSERT(imp);
+
+       req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_ID);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
+                            RCL_CLIENT, sizeof(KEY_LAST_ID));
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+       memcpy(tmp, KEY_LAST_ID, sizeof(KEY_LAST_ID));
+
+       req->rq_no_delay = req->rq_no_resend = 1;
+       ptlrpc_request_set_replen(req);
+       rc = ptlrpc_queue_wait(req);
+       if (rc) {
+               /* bad-bad OST.. let sysadm sort this out */
+               ptlrpc_set_import_active(imp, 0);
+               GOTO(out, rc);
+       }
+
+       reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
+       if (reply == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       d->opd_last_used_id = *reply;
+       CDEBUG(D_HA, "%s: got last_id "LPU64" from OST\n",
+              d->opd_obd->obd_name, d->opd_last_used_id);
+
+out:
+       ptlrpc_req_finished(req);
+       RETURN(rc);
+
+}
+
 /**
  * asks OST to clean precreate orphans
  * and gets next id for new objects
@@ -328,22 +377,32 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        LASSERT(d->opd_recovery_completed);
        LASSERT(d->opd_pre_reserved == 0);
 
+       CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
+               d->opd_obd->obd_name, d->opd_last_used_id);
+
+       if (d->opd_last_used_id < 2) {
+               /* lastid looks strange... ask OST */
+               rc = osp_get_lastid_from_ost(d);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
        imp = d->opd_obd->u.cli.cl_import;
        LASSERT(imp);
 
        req = ptlrpc_request_alloc(imp, &RQF_OST_CREATE);
        if (req == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out, rc = -ENOMEM);
 
        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
        if (rc) {
                ptlrpc_request_free(req);
-               RETURN(rc);
+               GOTO(out, rc);
        }
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
-               GOTO(out_req, rc = -EPROTO);
+               GOTO(out, rc = -EPROTO);
 
        body->oa.o_flags = OBD_FL_DELORPHAN;
        body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
@@ -358,14 +417,12 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        req->rq_no_resend = req->rq_no_delay = 1;
 
        rc = ptlrpc_queue_wait(req);
-       if (rc) {
-               ptlrpc_set_import_active(imp, 0);
-               GOTO(out_req, rc);
-       }
+       if (rc)
+               GOTO(out, rc);
 
        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
-               GOTO(out_req, rc = -EPROTO);
+               GOTO(out, rc = -EPROTO);
 
        /*
         * OST provides us with id new pool starts from in body->oa.o_id
@@ -380,20 +437,18 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
                d->opd_pre_grow_count = OST_MIN_PRECREATE;
                d->opd_pre_last_created = body->oa.o_id + 1;
        }
-       d->opd_pre_next = d->opd_pre_last_created;
+       d->opd_pre_used_id = d->opd_pre_last_created - 1;
        d->opd_pre_grow_slow = 0;
        cfs_spin_unlock(&d->opd_pre_lock);
 
-       /* now we can wakeup all users awaiting for objects */
-       osp_pre_update_status(d, rc);
-       cfs_waitq_signal(&d->opd_pre_user_waitq);
-
        CDEBUG(D_HA, "Got last_id "LPU64" from OST, last_used is "LPU64
               ", next "LPU64"\n", body->oa.o_id,
-              le64_to_cpu(d->opd_last_used_id), d->opd_pre_next);
+              le64_to_cpu(d->opd_last_used_id), d->opd_pre_used_id);
+
+out:
+       if (req)
+               ptlrpc_req_finished(req);
 
-out_req:
-       ptlrpc_req_finished(req);
        RETURN(rc);
 }
 
@@ -506,6 +561,18 @@ static int osp_precreate_thread(void *_arg)
                        if (rc) {
                                CERROR("%s: cannot cleanup orphans: rc = %d\n",
                                       d->opd_obd->obd_name,  rc);
+                               /* we can't proceed from here, OST seem to
+                                * be in a bad shape, better to wait for
+                                * a new instance of the server and repeat
+                                * from the beginning. notify possible waiters
+                                * this OSP isn't quite functional yet */
+                               osp_pre_update_status(d, rc);
+                               cfs_waitq_signal(&d->opd_pre_user_waitq);
+                               l_wait_event(d->opd_pre_waitq,
+                                            !osp_precreate_running(d) ||
+                                            d->opd_new_connection, &lwi);
+                               continue;
+
                        }
                }
 
@@ -552,8 +619,12 @@ static int osp_precreate_thread(void *_arg)
 
 static int osp_precreate_ready_condition(struct osp_device *d)
 {
+       __u64 next;
+
        /* ready if got enough precreated objects */
-       if (d->opd_pre_next + d->opd_pre_reserved < d->opd_pre_last_created)
+       /* we need to wait for others (opd_pre_reserved) and our object (+1) */
+       next = d->opd_pre_used_id + d->opd_pre_reserved + 1;
+       if (next <= d->opd_pre_last_created)
                return 1;
 
        /* ready if OST reported no space and no destoys in progress */
@@ -572,7 +643,7 @@ static int osp_precreate_timeout_condition(void *data)
                      "reserved="LPU64", syn_changes=%lu, "
                      "syn_rpc_in_progress=%d, status=%d\n",
                      d->opd_obd->obd_name, d->opd_pre_last_created,
-                     d->opd_pre_next, d->opd_pre_reserved,
+                     d->opd_pre_used_id, d->opd_pre_reserved,
                      d->opd_syn_changes, d->opd_syn_rpc_in_progress,
                      d->opd_pre_status);
 
@@ -591,10 +662,11 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
        struct l_wait_info       lwi;
        cfs_time_t               expire = cfs_time_shift(obd_timeout);
        int                      precreated, rc;
+       int                      count = 0;
 
        ENTRY;
 
-       LASSERT(d->opd_pre_last_created >= d->opd_pre_next);
+       LASSERT(d->opd_pre_last_created >= d->opd_pre_used_id);
 
        lwi = LWI_TIMEOUT(cfs_time_seconds(obd_timeout),
                          osp_precreate_timeout_condition, d);
@@ -612,12 +684,23 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                                break;
                }
 
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 90, 0)
+               /*
+                * to address Andreas's concern on possible busy-loop
+                * between this thread and osp_precreate_send()
+                */
+               if (unlikely(count++ == 1000)) {
+                       osp_precreate_timeout_condition(d);
+                       LBUG();
+               }
+#endif
+
                /*
                 * increase number of precreations
                 */
                if (d->opd_pre_grow_count < d->opd_pre_max_grow_count &&
                    d->opd_pre_grow_slow == 0 &&
-                   (d->opd_pre_last_created - d->opd_pre_next <=
+                   (d->opd_pre_last_created - d->opd_pre_used_id <=
                     d->opd_pre_grow_count / 4 + 1)) {
                        cfs_spin_lock(&d->opd_pre_lock);
                        d->opd_pre_grow_slow = 1;
@@ -629,7 +712,7 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                 * we never use the last object in the window
                 */
                cfs_spin_lock(&d->opd_pre_lock);
-               precreated = d->opd_pre_last_created - d->opd_pre_next;
+               precreated = d->opd_pre_last_created - d->opd_pre_used_id;
                if (precreated > d->opd_pre_reserved) {
                        d->opd_pre_reserved++;
                        cfs_spin_unlock(&d->opd_pre_lock);
@@ -684,8 +767,8 @@ __u64 osp_precreate_get_id(struct osp_device *d)
 
        /* grab next id from the pool */
        cfs_spin_lock(&d->opd_pre_lock);
-       LASSERT(d->opd_pre_next <= d->opd_pre_last_created);
-       objid = d->opd_pre_next++;
+       LASSERT(d->opd_pre_used_id < d->opd_pre_last_created);
+       objid = ++d->opd_pre_used_id;
        d->opd_pre_reserved--;
        /*
         * last_used_id must be changed along with getting new id otherwise
@@ -784,8 +867,8 @@ int osp_init_precreate(struct osp_device *d)
 
        /* initially precreation isn't ready */
        d->opd_pre_status = -EAGAIN;
-       d->opd_pre_next = 1;
-       d->opd_pre_last_created = 1;
+       d->opd_pre_used_id = 0;
+       d->opd_pre_last_created = 0;
        d->opd_pre_reserved = 0;
        d->opd_got_disconnected = 1;
        d->opd_pre_grow_slow = 0;