Whamcloud - gitweb
LU-1303 mds: integration lod/osp into the stack
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
index dfcb2a2..53cfef9 100644 (file)
@@ -312,6 +312,56 @@ out_req:
        RETURN(rc);
 }
 
+
+static int osp_get_lastid_from_ost(struct osp_device *d)
+{
+       struct ptlrpc_request   *req;
+       struct obd_import       *imp;
+       obd_id                  *reply;
+       char                    *tmp;
+       int                      rc;
+
+       imp = d->opd_obd->u.cli.cl_import;
+       LASSERT(imp);
+
+       req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_ID);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
+                            RCL_CLIENT, sizeof(KEY_LAST_ID));
+       rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+       memcpy(tmp, KEY_LAST_ID, sizeof(KEY_LAST_ID));
+
+       req->rq_no_delay = req->rq_no_resend = 1;
+       ptlrpc_request_set_replen(req);
+       rc = ptlrpc_queue_wait(req);
+       if (rc) {
+               /* bad-bad OST.. let sysadm sort this out */
+               ptlrpc_set_import_active(imp, 0);
+               GOTO(out, rc);
+       }
+
+       reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
+       if (reply == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       d->opd_last_used_id = *reply;
+       CDEBUG(D_HA, "%s: got last_id "LPU64" from OST\n",
+              d->opd_obd->obd_name, d->opd_last_used_id);
+
+out:
+       ptlrpc_req_finished(req);
+       RETURN(rc);
+
+}
+
 /**
  * asks OST to clean precreate orphans
  * and gets next id for new objects
@@ -328,22 +378,32 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        LASSERT(d->opd_recovery_completed);
        LASSERT(d->opd_pre_reserved == 0);
 
+       CDEBUG(D_HA, "%s: going to cleanup orphans since "LPU64"\n",
+               d->opd_obd->obd_name, d->opd_last_used_id);
+
+       if (d->opd_last_used_id < 2) {
+               /* lastid looks strange... ask OST */
+               rc = osp_get_lastid_from_ost(d);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
        imp = d->opd_obd->u.cli.cl_import;
        LASSERT(imp);
 
        req = ptlrpc_request_alloc(imp, &RQF_OST_CREATE);
        if (req == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out, rc = -ENOMEM);
 
        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
        if (rc) {
                ptlrpc_request_free(req);
-               RETURN(rc);
+               GOTO(out, rc);
        }
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
-               GOTO(out_req, rc = -EPROTO);
+               GOTO(out, rc = -EPROTO);
 
        body->oa.o_flags = OBD_FL_DELORPHAN;
        body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
@@ -358,14 +418,12 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        req->rq_no_resend = req->rq_no_delay = 1;
 
        rc = ptlrpc_queue_wait(req);
-       if (rc) {
-               ptlrpc_set_import_active(imp, 0);
-               GOTO(out_req, rc);
-       }
+       if (rc)
+               GOTO(out, rc);
 
        body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
-               GOTO(out_req, rc = -EPROTO);
+               GOTO(out, rc = -EPROTO);
 
        /*
         * OST provides us with id new pool starts from in body->oa.o_id
@@ -384,16 +442,14 @@ static int osp_precreate_cleanup_orphans(struct osp_device *d)
        d->opd_pre_grow_slow = 0;
        cfs_spin_unlock(&d->opd_pre_lock);
 
-       /* now we can wakeup all users awaiting for objects */
-       osp_pre_update_status(d, rc);
-       cfs_waitq_signal(&d->opd_pre_user_waitq);
-
        CDEBUG(D_HA, "Got last_id "LPU64" from OST, last_used is "LPU64
               ", next "LPU64"\n", body->oa.o_id,
               le64_to_cpu(d->opd_last_used_id), d->opd_pre_next);
 
-out_req:
-       ptlrpc_req_finished(req);
+out:
+       if (req)
+               ptlrpc_req_finished(req);
+
        RETURN(rc);
 }
 
@@ -506,6 +562,18 @@ static int osp_precreate_thread(void *_arg)
                        if (rc) {
                                CERROR("%s: cannot cleanup orphans: rc = %d\n",
                                       d->opd_obd->obd_name,  rc);
+                               /* we can't proceed from here, OST seem to
+                                * be in a bad shape, better to wait for
+                                * a new instance of the server and repeat
+                                * from the beginning. notify possible waiters
+                                * this OSP isn't quite functional yet */
+                               osp_pre_update_status(d, rc);
+                               cfs_waitq_signal(&d->opd_pre_user_waitq);
+                               l_wait_event(d->opd_pre_waitq,
+                                            !osp_precreate_running(d) ||
+                                            d->opd_new_connection, &lwi);
+                               continue;
+
                        }
                }
 
@@ -591,6 +659,7 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
        struct l_wait_info       lwi;
        cfs_time_t               expire = cfs_time_shift(obd_timeout);
        int                      precreated, rc;
+       int                      count = 0;
 
        ENTRY;
 
@@ -612,6 +681,15 @@ int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
                                break;
                }
 
+#if LUSTRE_VERSION_CODE >= OBD_OCD_VERSION(2, 3, 90, 0)
+#error "remove this before the release"
+#endif
+               /*
+                * to address Andreas's concern on possible busy-loop
+                * between this thread and osp_precreate_send()
+                */
+               LASSERT(count++ < 1000);
+
                /*
                 * increase number of precreations
                 */