Whamcloud - gitweb
Add async create into HEAD (port from 1.x)
[fs/lustre-release.git] / lustre / osc / osc_create.c
index a851c35..53d6912 100644 (file)
 # include <ctype.h>
 #endif
 
-# include <lustre_dlm.h>
+#include <lustre_dlm.h>
 #include <obd_class.h>
 #include "osc_internal.h"
 
+/* XXX need AT adjust ? */
+#define osc_create_timeout      (obd_timeout / 2)
+
+struct osc_create_async_args {
+        struct osc_creator      *rq_oscc;
+        struct lov_stripe_md    *rq_lsm;
+        struct obd_info         *rq_oinfo;
+};
+
+static int oscc_internal_create(struct osc_creator *oscc);
+static int handle_async_create(struct ptlrpc_request *req, int rc);
+
 static int osc_interpret_create(const struct lu_env *env,
                                 struct ptlrpc_request *req, void *data, int rc)
 {
         struct osc_creator *oscc;
         struct ost_body *body = NULL;
+        struct ptlrpc_request *fake_req, *pos;
         ENTRY;
 
         if (req->rq_repmsg) {
@@ -110,11 +123,6 @@ static int osc_interpret_create(const struct lu_env *env,
                 spin_unlock(&oscc->oscc_lock);
                 break;
         }
-        case -EAGAIN:
-                /* valid race delorphan vs create, or somthing after resend */
-                spin_unlock(&oscc->oscc_lock);
-                DEBUG_REQ(D_INODE, req, "Got EAGAIN - resend \n");
-                break;
         case -ENOSPC:
         case -EROFS:
         case -EFBIG: {
@@ -135,6 +143,15 @@ static int osc_interpret_create(const struct lu_env *env,
                 spin_unlock(&oscc->oscc_lock);
                 break;
         }
+        case -EWOULDBLOCK: {
+                /* aka EAGAIN we should not delay create if import failed -
+                 * this avoid client stick in create and avoid race with
+                 * delorphan */
+                oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+                /* oscc->oscc_grow_count = OST_MIN_PRECREATE; */
+                spin_unlock(&oscc->oscc_lock);
+                break;
+        }
         default: {
                 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
                 oscc->oscc_grow_count = OST_MIN_PRECREATE;
@@ -149,6 +166,19 @@ static int osc_interpret_create(const struct lu_env *env,
         CDEBUG(D_HA, "preallocated through id "LPU64" (next to use "LPU64")\n",
                oscc->oscc_last_id, oscc->oscc_next_id);
 
+        spin_lock(&oscc->oscc_lock);
+        list_for_each_entry_safe(fake_req, pos,
+                                 &oscc->oscc_wait_create_list, rq_list) {
+                if (handle_async_create(fake_req, rc)  == -EAGAIN) {
+                        oscc_internal_create(oscc);
+                        /* sending request should be never fail because
+                         * osc use preallocated requests pool */
+                        GOTO(exit_wakeup, rc);
+                }
+        }
+        spin_unlock(&oscc->oscc_lock);
+
+exit_wakeup:
         cfs_waitq_signal(&oscc->oscc_waitq);
         RETURN(rc);
 }
@@ -162,12 +192,13 @@ static int oscc_internal_create(struct osc_creator *oscc)
 
         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
 
-        if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
-            oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+        if(oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
                 spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
 
+        /* we need check it before OSCC_FLAG_CREATING - because need
+         * see lower number of precreate objects */
         if (oscc->oscc_grow_count < oscc->oscc_max_grow_count &&
             ((oscc->oscc_flags & OSCC_FLAG_LOW) == 0) &&
             (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
@@ -176,6 +207,11 @@ static int oscc_internal_create(struct osc_creator *oscc)
                 oscc->oscc_grow_count *= 2;
         }
 
+        if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
+                spin_unlock(&oscc->oscc_lock);
+                RETURN(0);
+        }
+
         if (oscc->oscc_grow_count > oscc->oscc_max_grow_count / 2)
                 oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2;
 
@@ -206,6 +242,9 @@ static int oscc_internal_create(struct osc_creator *oscc)
         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
                body->oa.o_id, oscc->oscc_last_id);
 
+        /* we should not resend create request - anyway we will have delorphan
+         * and kill these objects */
+        request->rq_no_delay = request->rq_no_resend = 1;
         ptlrpc_req_set_repsize(request, 2, size);
 
         request->rq_async_args.pointer_arg[0] = oscc;
@@ -215,17 +254,19 @@ static int oscc_internal_create(struct osc_creator *oscc)
         RETURN(0);
 }
 
+static int oscc_has_objects_nolock(struct osc_creator *oscc, int count)
+{
+        return ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
+}
+
+
 static int oscc_has_objects(struct osc_creator *oscc, int count)
 {
         int have_objs;
-        spin_lock(&oscc->oscc_lock);
-        have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
 
-        if (!have_objs) {
-                oscc_internal_create(oscc);
-        } else {
-                spin_unlock(&oscc->oscc_lock);
-        }
+        spin_lock(&oscc->oscc_lock);
+        have_objs = oscc_has_objects_nolock(oscc, count);
+        spin_unlock(&oscc->oscc_lock);
 
         return have_objs;
 }
@@ -236,33 +277,39 @@ static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
         int ost_full;
         int osc_invalid;
 
-        have_objs = oscc_has_objects(oscc, count);
+        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
 
         spin_lock(&oscc->oscc_lock);
         ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
-        spin_unlock(&oscc->oscc_lock);
+        have_objs = oscc_has_objects_nolock(oscc, count);
+        osc_invalid |= oscc->oscc_flags & OSCC_FLAG_EXITING;
 
-        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
+        if (!ost_full && !osc_invalid)
+                /* they release lock himself */
+                oscc_internal_create(oscc);
+        else
+                spin_unlock(&oscc->oscc_lock);
 
         return have_objs || ost_full || osc_invalid;
 }
 
-static int oscc_precreate(struct osc_creator *oscc, int wait)
+static int oscc_precreate(struct osc_creator *oscc)
 {
-        struct l_wait_info lwi = { 0 };
+        struct l_wait_info lwi;
         int rc = 0;
         ENTRY;
 
         if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
                 RETURN(0);
 
-        if (!wait)
-                RETURN(0);
+        /* we should be not block forever - because client's create rpc can
+         * stick in mds for long time and forbid client reconnect */
+        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(osc_create_timeout)),
+                          NULL, NULL);
 
-        /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */
-        l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
+        rc = l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
 
-        if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
+        if (!oscc_has_objects(oscc, 1) || (oscc->oscc_flags & OSCC_FLAG_NOSPC))
                 rc = -ENOSPC;
 
         if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
@@ -271,9 +318,9 @@ static int oscc_precreate(struct osc_creator *oscc, int wait)
         RETURN(rc);
 }
 
-int oscc_recovering(struct osc_creator *oscc)
+static int oscc_recovering(struct osc_creator *oscc)
 {
-        int recov = 0;
+        int recov;
 
         spin_lock(&oscc->oscc_lock);
         recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
@@ -282,6 +329,17 @@ int oscc_recovering(struct osc_creator *oscc)
         return recov;
 }
 
+static int oscc_in_sync(struct osc_creator *oscc)
+{
+        int sync;
+
+        spin_lock(&oscc->oscc_lock);
+        sync = oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS;
+        spin_unlock(&oscc->oscc_lock);
+
+        return sync;
+}
+
 /* decide if the OST has remaining object, return value :
         0 : the OST has remaining object, and don't need to do precreate.
         1 : the OST has no remaining object, and will send a RPC for precreate.
@@ -299,26 +357,150 @@ int osc_precreate(struct obd_export *exp)
         if (imp != NULL && imp->imp_deactive)
                 RETURN(1000);
 
+        /* until oscc in recovery - other flags is wrong */
         if (oscc_recovering(oscc))
                 RETURN(2);
 
         if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
                 RETURN(1000);
 
-        if (oscc->oscc_last_id < oscc->oscc_next_id) {
-                if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS)
-                        RETURN(1);
-
-                spin_lock(&oscc->oscc_lock);
-                if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
-                        spin_unlock(&oscc->oscc_lock);
-                        RETURN(1);
-                }
+        if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
+                RETURN(0);
 
-                oscc_internal_create(oscc);
+        spin_lock(&oscc->oscc_lock);
+        if ((oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) ||
+            (oscc->oscc_flags & OSCC_FLAG_CREATING)) {
+                spin_unlock(&oscc->oscc_lock);
                 RETURN(1);
         }
-        RETURN(0);
+
+        oscc_internal_create(oscc);
+        RETURN(1);
+}
+
+static int handle_async_create(struct ptlrpc_request *req, int rc)
+{
+        struct osc_create_async_args *args = ptlrpc_req_async_args(req);
+        struct osc_creator    *oscc = args->rq_oscc;
+        struct lov_stripe_md  *lsm  = args->rq_lsm;
+        struct obd_info       *oinfo = args->rq_oinfo;
+        struct obdo           *oa = oinfo->oi_oa;
+
+        LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
+
+        if(rc)
+                GOTO(out_wake, rc);
+
+        if ((oscc->oscc_flags & OSCC_FLAG_EXITING))
+                GOTO(out_wake, rc = -EIO);
+
+        if (oscc_has_objects_nolock(oscc, 1)) {
+                memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
+                oa->o_id = oscc->oscc_next_id;
+                lsm->lsm_object_id = oscc->oscc_next_id;
+                oscc->oscc_next_id++;
+
+                CDEBUG(D_RPCTRACE, " set oscc_next_id = "LPU64"\n",
+                       oscc->oscc_next_id);
+               GOTO(out_wake, rc = 0);
+        }
+
+        /* should be try wait until recovery finished */
+        if(oscc->oscc_flags & OSCC_FLAG_RECOVERING)
+                RETURN(-EAGAIN);
+
+        if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
+                GOTO(out_wake, rc = -ENOSPC);
+
+        /* we not have objects now - continue wait */
+        RETURN(-EAGAIN);
+
+out_wake:
+
+        rc = oinfo->oi_cb_up(oinfo, rc);
+        ptlrpc_fakereq_finished(req);
+
+        RETURN(rc);
+}
+
+static int async_create_interpret(const struct lu_env *env,
+                                  struct ptlrpc_request *req, void *data, int rc)
+{
+        struct osc_create_async_args *args = ptlrpc_req_async_args(req);
+        struct osc_creator    *oscc = args->rq_oscc;
+        int ret;
+
+        spin_lock(&oscc->oscc_lock);
+        ret = handle_async_create(req, rc);
+        spin_unlock(&oscc->oscc_lock);
+
+        return ret;
+}
+
+int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
+                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+        int rc;
+        struct ptlrpc_request *fake_req;
+        struct osc_create_async_args *args;
+        struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
+        struct obdo *oa = oinfo->oi_oa;
+        ENTRY;
+
+        if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0)){
+                rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
+                rc = oinfo->oi_cb_up(oinfo, rc);
+                RETURN(rc);
+        }
+
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            oa->o_flags == OBD_FL_RECREATE_OBJS) {
+                rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
+                rc = oinfo->oi_cb_up(oinfo, rc);
+                RETURN(rc);
+        }
+
+        LASSERT((*ea) != NULL);
+
+        fake_req = ptlrpc_prep_fakereq(oscc->oscc_obd->u.cli.cl_import,
+                                       osc_create_timeout,
+                                       async_create_interpret);
+        if (fake_req == NULL) {
+                rc = oinfo->oi_cb_up(oinfo, -ENOMEM);
+                RETURN(-ENOMEM);
+        }
+
+        args = ptlrpc_req_async_args(fake_req);
+        CLASSERT(sizeof(*args) <= sizeof(fake_req->rq_async_args));
+
+        args->rq_oscc  = oscc;
+        args->rq_lsm   = *ea;
+        args->rq_oinfo = oinfo;
+
+        spin_lock(&oscc->oscc_lock);
+        /* try fast path */
+        rc = handle_async_create(fake_req, 0);
+        if (rc == -EAGAIN) {
+                int is_add;
+                /* we not have objects - try wait */
+                is_add = ptlrpcd_add_req(fake_req, PSCOPE_OTHER);
+                if (!is_add)
+                        list_add(&fake_req->rq_list,
+                                 &oscc->oscc_wait_create_list);
+                else
+                        rc = is_add;
+        }
+        spin_unlock(&oscc->oscc_lock);
+
+        if (rc != -EAGAIN)
+                /* need free request if was error hit or
+                 * objects already allocated */
+                ptlrpc_req_finished(fake_req);
+        else
+                /* EAGAIN mean - request is delayed */
+                rc = 0;
+
+        RETURN(rc);
 }
 
 int osc_create(struct obd_export *exp, struct obdo *oa,
@@ -327,7 +509,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
         struct obd_import  *imp  = exp->exp_obd->u.cli.cl_import;
         struct lov_stripe_md *lsm;
-        int try_again = 1, rc = 0;
+        int rc = 0;
         ENTRY;
 
         LASSERT(oa);
@@ -355,6 +537,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         spin_unlock(&oscc->oscc_lock);
                         RETURN(0);
                 }
+
                 oscc->oscc_flags |= OSCC_FLAG_SYNC_IN_PROGRESS;
                 /* seting flag LOW we prevent extra grow precreate size
                  * and enforce use last assigned size */
@@ -392,15 +575,16 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         CDEBUG(D_HA, "%s: oscc recovery finished, last_id: "
                                LPU64", rc: %d\n", oscc->oscc_obd->obd_name,
                                oscc->oscc_last_id, rc);
-                        cfs_waitq_signal(&oscc->oscc_waitq);
                 } else {
                         CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n",
                                oscc->oscc_obd->obd_name, rc);
                 }
-                spin_unlock(&oscc->oscc_lock);
 
+                cfs_waitq_signal(&oscc->oscc_waitq);
+                spin_unlock(&oscc->oscc_lock);
 
-                RETURN(rc);
+                if (rc < 0)
+                        RETURN(rc);
         }
 
         lsm = *ea;
@@ -410,27 +594,16 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         RETURN(rc);
         }
 
-        while (try_again) {
-                /* If orphans are being recovered, then we must wait until
-                   it is finished before we can continue with create. */
-                if (oscc_recovering(oscc)) {
-                        struct l_wait_info lwi;
-
+        while (1) {
+                if (oscc_in_sync(oscc))
                         CDEBUG(D_HA,"%s: oscc recovery in progress, waiting\n",
                                oscc->oscc_obd->obd_name);
 
-                        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(
-                                obd_timeout / 4)), NULL, NULL);
-                        rc = l_wait_event(oscc->oscc_waitq,
-                                          !oscc_recovering(oscc), &lwi);
-                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                        if (rc == -ETIMEDOUT) {
-                                CDEBUG(D_HA,"%s: timeout waiting on recovery\n",
-                                       oscc->oscc_obd->obd_name);
-                                RETURN(rc);
-                        }
-                        CDEBUG(D_HA, "%s: oscc recovery over, waking up\n",
-                               oscc->oscc_obd->obd_name);
+                rc = oscc_precreate(oscc);
+                if (rc) {
+                        CDEBUG(D_HA,"%s: error create %d\n",
+                               oscc->oscc_obd->obd_name, rc);
+                        break;
                 }
 
                 spin_lock(&oscc->oscc_lock);
@@ -438,26 +611,31 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         spin_unlock(&oscc->oscc_lock);
                         break;
                 }
+                /* wakeup but recovery not finished */
+                if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+                        rc = -EIO;
+                        spin_unlock(&oscc->oscc_lock);
+                        break;
+                }
 
-                if (oscc->oscc_last_id >= oscc->oscc_next_id) {
+                if (oscc_has_objects_nolock(oscc, 1)) {
                         memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
                         oa->o_id = oscc->oscc_next_id;
                         lsm->lsm_object_id = oscc->oscc_next_id;
                         *ea = lsm;
                         oscc->oscc_next_id++;
-                        try_again = 0;
+                        spin_unlock(&oscc->oscc_lock);
 
                         CDEBUG(D_RPCTRACE, "%s: set oscc_next_id = "LPU64"\n",
                                exp->exp_obd->obd_name, oscc->oscc_next_id);
+                        break;
                 } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
                         rc = -ENOSPC;
                         spin_unlock(&oscc->oscc_lock);
                         break;
                 }
+
                 spin_unlock(&oscc->oscc_lock);
-                rc = oscc_precreate(oscc, try_again);
-                if (rc)
-                        break;
         }
 
         if (rc == 0)
@@ -478,7 +656,7 @@ void oscc_init(struct obd_device *obd)
         oscc = &obd->u.cli.cl_oscc;
 
         memset(oscc, 0, sizeof(*oscc));
-        CFS_INIT_LIST_HEAD(&oscc->oscc_list);
+
         cfs_waitq_init(&oscc->oscc_waitq);
         spin_lock_init(&oscc->oscc_lock);
         oscc->oscc_obd = obd;
@@ -488,6 +666,21 @@ void oscc_init(struct obd_device *obd)
         oscc->oscc_next_id = 2;
         oscc->oscc_last_id = 1;
         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+
+        CFS_INIT_LIST_HEAD(&oscc->oscc_wait_create_list);
+
         /* XXX the export handle should give the oscc the last object */
         /* oed->oed_oscc.oscc_last_id = exph->....; */
 }
+
+void oscc_fini(struct obd_device *obd)
+{
+        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
+        ENTRY;
+
+
+        spin_lock(&oscc->oscc_lock);
+        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
+        oscc->oscc_flags |= OSCC_FLAG_EXITING;
+        spin_unlock(&oscc->oscc_lock);
+}