Whamcloud - gitweb
Add async create into HEAD (port from 1.x)
authorshadow <shadow>
Sat, 27 Jun 2009 05:28:28 +0000 (05:28 +0000)
committershadow <shadow>
Sat, 27 Jun 2009 05:28:28 +0000 (05:28 +0000)
Branch HEAD
b=18357
i=rread

16 files changed:
lustre/include/lustre_export.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/obdclass/lprocfs_status.c
lustre/osc/osc_create.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/ptlrpcd.c

index 710f13c..5c328f7 100644 (file)
@@ -74,7 +74,7 @@ struct mdt_export_data {
 
 struct osc_creator {
         spinlock_t              oscc_lock;
-        struct list_head        oscc_list;
+        struct list_head        oscc_wait_create_list;
         struct obd_device       *oscc_obd;
         obd_id                  oscc_last_id;//last available pre-created object
         obd_id                  oscc_next_id;// what object id to give out next
index 32b0291..57463eb 100644 (file)
@@ -364,6 +364,7 @@ struct ptlrpc_request {
                 rq_no_resend:1, rq_waiting:1, rq_receiving_reply:1,
                 rq_no_delay:1, rq_net_err:1, rq_wait_ctx:1,
                 rq_early:1, rq_must_unlink:1,
+                rq_fake:1,          /* this fake req */
                 /* server-side flags */
                 rq_packed_final:1,  /* packed final reply */
                 rq_sent_final:1,    /* stop sending early replies */
@@ -496,6 +497,18 @@ struct ptlrpc_request {
         struct req_capsule          rq_pill;
 };
 
+static inline int ptlrpc_req_interpret(const struct lu_env *env,
+                                       struct ptlrpc_request *req, int rc)
+{
+        if (req->rq_interpret_reply != NULL) {
+                req->rq_status = req->rq_interpret_reply(env, req,
+                                                         &req->rq_async_args,
+                                                         rc);
+                return req->rq_status;
+        }
+        return rc;
+}
+
 static inline void lustre_set_req_swabbed(struct ptlrpc_request *req, int index)
 {
         LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
@@ -970,6 +983,11 @@ struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                              __u32 version, int opcode, char **bufs,
                              struct ptlrpc_cli_ctx *ctx);
+struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp,
+                                           unsigned int timeout,
+                                           ptlrpc_interpterer_t interpreter);
+void ptlrpc_fakereq_finished(struct ptlrpc_request *req);
+
 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
                                        int opcode, int count, __u32 *lengths,
                                        char **bufs);
@@ -1324,7 +1342,7 @@ enum ptlrpcd_scope {
 int ptlrpcd_start(const char *name, struct ptlrpcd_ctl *pc);
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
 void ptlrpcd_wake(struct ptlrpc_request *req);
-void ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope);
+int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope);
 void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
index 18d92a2..067c58c 100644 (file)
@@ -1321,6 +1321,9 @@ struct obd_ops {
         int (*o_precreate)(struct obd_export *exp);
         int (*o_create)(struct obd_export *exp,  struct obdo *oa,
                         struct lov_stripe_md **ea, struct obd_trans_info *oti);
+        int (*o_create_async)(struct obd_export *exp,  struct obd_info *oinfo,
+                              struct lov_stripe_md **ea,
+                              struct obd_trans_info *oti);
         int (*o_destroy)(struct obd_export *exp, struct obdo *oa,
                          struct lov_stripe_md *ea, struct obd_trans_info *oti,
                          struct obd_export *md_exp, void *capa);
index 86690a2..c7ef51a 100644 (file)
@@ -676,6 +676,21 @@ static inline int obd_precreate(struct obd_export *exp)
         RETURN(rc);
 }
 
+static inline int obd_create_async(struct obd_export *exp,
+                                   struct obd_info *oinfo,
+                                   struct lov_stripe_md **ea,
+                                   struct obd_trans_info *oti)
+{
+        int rc;
+        ENTRY;
+
+        EXP_CHECK_DT_OP(exp, create_async);
+        EXP_COUNTER_INCREMENT(exp, create_async);
+
+        rc = OBP(exp->exp_obd, create_async)(exp, oinfo, ea, oti);
+        RETURN(rc);
+}
+
 static inline int obd_create(struct obd_export *exp, struct obdo *obdo,
                              struct lov_stripe_md **ea,
                              struct obd_trans_info *oti)
index e76f4ec..8f32543 100644 (file)
@@ -81,6 +81,7 @@ struct lov_request_set {
         struct brw_page         *set_pga;
         struct lov_lock_handles *set_lockh;
         struct list_head         set_list;
+        cfs_waitq_t              set_waitq;
 };
 
 extern cfs_mem_cache_t *lov_oinfo_slab;
@@ -173,6 +174,7 @@ int qos_remedy_create(struct lov_request_set *set, struct lov_request *req);
 
 /* lov_request.c */
 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set);
+int lov_finished_set(struct lov_request_set *set);
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc);
 int lov_update_common_set(struct lov_request_set *set,
@@ -181,6 +183,7 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oifo,
                         struct lov_stripe_md **ea, struct obdo *src_oa,
                         struct obd_trans_info *oti,
                         struct lov_request_set **reqset);
+int cb_create_update(void *cookie, int rc);
 int lov_update_create_set(struct lov_request_set *set,
                           struct lov_request *req, int rc);
 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea);
index 92a32b7..a90739b 100644 (file)
@@ -1101,6 +1101,7 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
         struct obd_info oinfo;
         struct lov_request_set *set = NULL;
         struct lov_request *req;
+        struct l_wait_info  lwi = { 0 };
         int rc = 0;
         ENTRY;
 
@@ -1138,10 +1139,18 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
 
         list_for_each_entry(req, &set->set_list, rq_link) {
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                rc = obd_create(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                                req->rq_oi.oi_oa, &req->rq_oi.oi_md, oti);
-                lov_update_create_set(set, req, rc);
+                rc = obd_create_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
+                                      &req->rq_oi, &req->rq_oi.oi_md, oti);
         }
+
+        /* osc_create have timeout equ obd_timeout/2 so waiting don't be
+         * longer then this */
+        l_wait_event(set->set_waitq, lov_finished_set(set), &lwi);
+
+        /* we not have ptlrpc set for assign set->interpret and should
+         * be call interpret function himself. calling from cb_create_update
+         * not permited because lov_fini_create_set can sleep for long time,
+         * but we must avoid sleeping in ptlrpcd interpret function. */
         rc = lov_fini_create_set(set, ea);
 out:
         obd_putref(exp->exp_obd);
index b00808b..b5ba722 100644 (file)
@@ -1072,6 +1072,7 @@ int qos_prep_create(struct obd_export *exp, struct lov_request_set *set)
                 req->rq_stripe = i;
                 /* create data objects with "parent" OA */
                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
+                req->rq_oi.oi_cb_up = cb_create_update;
 
                 /* XXX When we start creating objects on demand, we need to
                  *     make sure that we always create the object on the
index 8e119f8..b2ea15c 100644 (file)
@@ -59,6 +59,7 @@ static void lov_init_set(struct lov_request_set *set)
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
         atomic_set(&set->set_refcount, 1);
+        cfs_waitq_init(&set->set_waitq);
 }
 
 static void lov_finish_set(struct lov_request_set *set)
@@ -93,6 +94,14 @@ static void lov_finish_set(struct lov_request_set *set)
         EXIT;
 }
 
+int lov_finished_set(struct lov_request_set *set)
+{
+        CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+               set->set_count);
+        return set->set_completes == set->set_count;
+}
+
+
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc)
 {
@@ -102,6 +111,8 @@ void lov_update_set(struct lov_request_set *set,
         set->set_completes++;
         if (rc == 0)
                 set->set_success++;
+
+        cfs_waitq_signal(&set->set_waitq);
 }
 
 int lov_update_common_set(struct lov_request_set *set,
@@ -125,6 +136,7 @@ void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 {
         list_add_tail(&req->rq_link, &set->set_list);
         set->set_count++;
+        req->rq_rqset = set;
 }
 
 extern void osc_update_enqueue(struct lustre_handle *lov_lockhp,
@@ -313,8 +325,6 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
                         sizeof(struct lov_oinfo *);
 
-
-                req->rq_rqset = set;
                 /* Set lov request specific parameters. */
                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
                 req->rq_oi.oi_cb_up = cb_update_enqueue;
@@ -695,6 +705,16 @@ int lov_update_create_set(struct lov_request_set *set,
         RETURN(0);
 }
 
+int cb_create_update(void *cookie, int rc)
+{
+        struct obd_info *oinfo = cookie;
+        struct lov_request *lovreq;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+}
+
+
 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
                         struct obd_trans_info *oti,
@@ -1003,7 +1023,6 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
                 req->rq_oi.oi_cb_up = cb_getattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
@@ -1201,7 +1220,6 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
                 req->rq_oi.oi_capa = oinfo->oi_capa;
-                req->rq_rqset = set;
 
                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
                         int off = lov_stripe_offset(oinfo->oi_md,
@@ -1338,7 +1356,6 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
 
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_update_punch;
-                req->rq_rqset = set;
 
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
@@ -1429,7 +1446,6 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
                 req->rq_oi.oi_policy.l_extent.gid = -1;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
@@ -1601,7 +1617,7 @@ static int cb_statfs_update(void *cookie, int rc)
         qos_update(lov);
 out:
         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
-            lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) {
+            lov_finished_set(lovreq->rq_rqset)) {
                lov_statfs_interpret(NULL, lovreq->rq_rqset,
                                     lovreq->rq_rqset->set_success !=
                                                   lovreq->rq_rqset->set_count);
@@ -1650,7 +1666,6 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
                 req->rq_idx = i;
                 req->rq_oi.oi_cb_up = cb_statfs_update;
                 req->rq_oi.oi_flags = oinfo->oi_flags;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
index 6a337c6..8885773 100644 (file)
@@ -1378,6 +1378,7 @@ void lprocfs_init_ops_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, preallocate);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, precreate);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, create);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, create_async);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, destroy);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, setattr);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, setattr_async);
index a851c35..53d6912 100644 (file)
 # include <ctype.h>
 #endif
 
-# include <lustre_dlm.h>
+#include <lustre_dlm.h>
 #include <obd_class.h>
 #include "osc_internal.h"
 
+/* XXX need AT adjust ? */
+#define osc_create_timeout      (obd_timeout / 2)
+
+struct osc_create_async_args {
+        struct osc_creator      *rq_oscc;
+        struct lov_stripe_md    *rq_lsm;
+        struct obd_info         *rq_oinfo;
+};
+
+static int oscc_internal_create(struct osc_creator *oscc);
+static int handle_async_create(struct ptlrpc_request *req, int rc);
+
 static int osc_interpret_create(const struct lu_env *env,
                                 struct ptlrpc_request *req, void *data, int rc)
 {
         struct osc_creator *oscc;
         struct ost_body *body = NULL;
+        struct ptlrpc_request *fake_req, *pos;
         ENTRY;
 
         if (req->rq_repmsg) {
@@ -110,11 +123,6 @@ static int osc_interpret_create(const struct lu_env *env,
                 spin_unlock(&oscc->oscc_lock);
                 break;
         }
-        case -EAGAIN:
-                /* valid race delorphan vs create, or somthing after resend */
-                spin_unlock(&oscc->oscc_lock);
-                DEBUG_REQ(D_INODE, req, "Got EAGAIN - resend \n");
-                break;
         case -ENOSPC:
         case -EROFS:
         case -EFBIG: {
@@ -135,6 +143,15 @@ static int osc_interpret_create(const struct lu_env *env,
                 spin_unlock(&oscc->oscc_lock);
                 break;
         }
+        case -EWOULDBLOCK: {
+                /* aka EAGAIN we should not delay create if import failed -
+                 * this avoid client stick in create and avoid race with
+                 * delorphan */
+                oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+                /* oscc->oscc_grow_count = OST_MIN_PRECREATE; */
+                spin_unlock(&oscc->oscc_lock);
+                break;
+        }
         default: {
                 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
                 oscc->oscc_grow_count = OST_MIN_PRECREATE;
@@ -149,6 +166,19 @@ static int osc_interpret_create(const struct lu_env *env,
         CDEBUG(D_HA, "preallocated through id "LPU64" (next to use "LPU64")\n",
                oscc->oscc_last_id, oscc->oscc_next_id);
 
+        spin_lock(&oscc->oscc_lock);
+        list_for_each_entry_safe(fake_req, pos,
+                                 &oscc->oscc_wait_create_list, rq_list) {
+                if (handle_async_create(fake_req, rc)  == -EAGAIN) {
+                        oscc_internal_create(oscc);
+                        /* sending request should be never fail because
+                         * osc use preallocated requests pool */
+                        GOTO(exit_wakeup, rc);
+                }
+        }
+        spin_unlock(&oscc->oscc_lock);
+
+exit_wakeup:
         cfs_waitq_signal(&oscc->oscc_waitq);
         RETURN(rc);
 }
@@ -162,12 +192,13 @@ static int oscc_internal_create(struct osc_creator *oscc)
 
         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
 
-        if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
-            oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+        if(oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
                 spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
 
+        /* we need check it before OSCC_FLAG_CREATING - because need
+         * see lower number of precreate objects */
         if (oscc->oscc_grow_count < oscc->oscc_max_grow_count &&
             ((oscc->oscc_flags & OSCC_FLAG_LOW) == 0) &&
             (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
@@ -176,6 +207,11 @@ static int oscc_internal_create(struct osc_creator *oscc)
                 oscc->oscc_grow_count *= 2;
         }
 
+        if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
+                spin_unlock(&oscc->oscc_lock);
+                RETURN(0);
+        }
+
         if (oscc->oscc_grow_count > oscc->oscc_max_grow_count / 2)
                 oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2;
 
@@ -206,6 +242,9 @@ static int oscc_internal_create(struct osc_creator *oscc)
         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
                body->oa.o_id, oscc->oscc_last_id);
 
+        /* we should not resend create request - anyway we will have delorphan
+         * and kill these objects */
+        request->rq_no_delay = request->rq_no_resend = 1;
         ptlrpc_req_set_repsize(request, 2, size);
 
         request->rq_async_args.pointer_arg[0] = oscc;
@@ -215,17 +254,19 @@ static int oscc_internal_create(struct osc_creator *oscc)
         RETURN(0);
 }
 
+static int oscc_has_objects_nolock(struct osc_creator *oscc, int count)
+{
+        return ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
+}
+
+
 static int oscc_has_objects(struct osc_creator *oscc, int count)
 {
         int have_objs;
-        spin_lock(&oscc->oscc_lock);
-        have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
 
-        if (!have_objs) {
-                oscc_internal_create(oscc);
-        } else {
-                spin_unlock(&oscc->oscc_lock);
-        }
+        spin_lock(&oscc->oscc_lock);
+        have_objs = oscc_has_objects_nolock(oscc, count);
+        spin_unlock(&oscc->oscc_lock);
 
         return have_objs;
 }
@@ -236,33 +277,39 @@ static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
         int ost_full;
         int osc_invalid;
 
-        have_objs = oscc_has_objects(oscc, count);
+        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
 
         spin_lock(&oscc->oscc_lock);
         ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
-        spin_unlock(&oscc->oscc_lock);
+        have_objs = oscc_has_objects_nolock(oscc, count);
+        osc_invalid |= oscc->oscc_flags & OSCC_FLAG_EXITING;
 
-        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
+        if (!ost_full && !osc_invalid)
+                /* they release lock himself */
+                oscc_internal_create(oscc);
+        else
+                spin_unlock(&oscc->oscc_lock);
 
         return have_objs || ost_full || osc_invalid;
 }
 
-static int oscc_precreate(struct osc_creator *oscc, int wait)
+static int oscc_precreate(struct osc_creator *oscc)
 {
-        struct l_wait_info lwi = { 0 };
+        struct l_wait_info lwi;
         int rc = 0;
         ENTRY;
 
         if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
                 RETURN(0);
 
-        if (!wait)
-                RETURN(0);
+        /* we should be not block forever - because client's create rpc can
+         * stick in mds for long time and forbid client reconnect */
+        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(osc_create_timeout)),
+                          NULL, NULL);
 
-        /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */
-        l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
+        rc = l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
 
-        if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
+        if (!oscc_has_objects(oscc, 1) || (oscc->oscc_flags & OSCC_FLAG_NOSPC))
                 rc = -ENOSPC;
 
         if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
@@ -271,9 +318,9 @@ static int oscc_precreate(struct osc_creator *oscc, int wait)
         RETURN(rc);
 }
 
-int oscc_recovering(struct osc_creator *oscc)
+static int oscc_recovering(struct osc_creator *oscc)
 {
-        int recov = 0;
+        int recov;
 
         spin_lock(&oscc->oscc_lock);
         recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
@@ -282,6 +329,17 @@ int oscc_recovering(struct osc_creator *oscc)
         return recov;
 }
 
+static int oscc_in_sync(struct osc_creator *oscc)
+{
+        int sync;
+
+        spin_lock(&oscc->oscc_lock);
+        sync = oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS;
+        spin_unlock(&oscc->oscc_lock);
+
+        return sync;
+}
+
 /* decide if the OST has remaining object, return value :
         0 : the OST has remaining object, and don't need to do precreate.
         1 : the OST has no remaining object, and will send a RPC for precreate.
@@ -299,26 +357,150 @@ int osc_precreate(struct obd_export *exp)
         if (imp != NULL && imp->imp_deactive)
                 RETURN(1000);
 
+        /* until oscc in recovery - other flags is wrong */
         if (oscc_recovering(oscc))
                 RETURN(2);
 
         if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
                 RETURN(1000);
 
-        if (oscc->oscc_last_id < oscc->oscc_next_id) {
-                if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS)
-                        RETURN(1);
-
-                spin_lock(&oscc->oscc_lock);
-                if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
-                        spin_unlock(&oscc->oscc_lock);
-                        RETURN(1);
-                }
+        if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
+                RETURN(0);
 
-                oscc_internal_create(oscc);
+        spin_lock(&oscc->oscc_lock);
+        if ((oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) ||
+            (oscc->oscc_flags & OSCC_FLAG_CREATING)) {
+                spin_unlock(&oscc->oscc_lock);
                 RETURN(1);
         }
-        RETURN(0);
+
+        oscc_internal_create(oscc);
+        RETURN(1);
+}
+
+static int handle_async_create(struct ptlrpc_request *req, int rc)
+{
+        struct osc_create_async_args *args = ptlrpc_req_async_args(req);
+        struct osc_creator    *oscc = args->rq_oscc;
+        struct lov_stripe_md  *lsm  = args->rq_lsm;
+        struct obd_info       *oinfo = args->rq_oinfo;
+        struct obdo           *oa = oinfo->oi_oa;
+
+        LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
+
+        if(rc)
+                GOTO(out_wake, rc);
+
+        if ((oscc->oscc_flags & OSCC_FLAG_EXITING))
+                GOTO(out_wake, rc = -EIO);
+
+        if (oscc_has_objects_nolock(oscc, 1)) {
+                memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
+                oa->o_id = oscc->oscc_next_id;
+                lsm->lsm_object_id = oscc->oscc_next_id;
+                oscc->oscc_next_id++;
+
+                CDEBUG(D_RPCTRACE, " set oscc_next_id = "LPU64"\n",
+                       oscc->oscc_next_id);
+               GOTO(out_wake, rc = 0);
+        }
+
+        /* should be try wait until recovery finished */
+        if(oscc->oscc_flags & OSCC_FLAG_RECOVERING)
+                RETURN(-EAGAIN);
+
+        if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
+                GOTO(out_wake, rc = -ENOSPC);
+
+        /* we not have objects now - continue wait */
+        RETURN(-EAGAIN);
+
+out_wake:
+
+        rc = oinfo->oi_cb_up(oinfo, rc);
+        ptlrpc_fakereq_finished(req);
+
+        RETURN(rc);
+}
+
+static int async_create_interpret(const struct lu_env *env,
+                                  struct ptlrpc_request *req, void *data, int rc)
+{
+        struct osc_create_async_args *args = ptlrpc_req_async_args(req);
+        struct osc_creator    *oscc = args->rq_oscc;
+        int ret;
+
+        spin_lock(&oscc->oscc_lock);
+        ret = handle_async_create(req, rc);
+        spin_unlock(&oscc->oscc_lock);
+
+        return ret;
+}
+
+int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
+                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+        int rc;
+        struct ptlrpc_request *fake_req;
+        struct osc_create_async_args *args;
+        struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
+        struct obdo *oa = oinfo->oi_oa;
+        ENTRY;
+
+        if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0)){
+                rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
+                rc = oinfo->oi_cb_up(oinfo, rc);
+                RETURN(rc);
+        }
+
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            oa->o_flags == OBD_FL_RECREATE_OBJS) {
+                rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
+                rc = oinfo->oi_cb_up(oinfo, rc);
+                RETURN(rc);
+        }
+
+        LASSERT((*ea) != NULL);
+
+        fake_req = ptlrpc_prep_fakereq(oscc->oscc_obd->u.cli.cl_import,
+                                       osc_create_timeout,
+                                       async_create_interpret);
+        if (fake_req == NULL) {
+                rc = oinfo->oi_cb_up(oinfo, -ENOMEM);
+                RETURN(-ENOMEM);
+        }
+
+        args = ptlrpc_req_async_args(fake_req);
+        CLASSERT(sizeof(*args) <= sizeof(fake_req->rq_async_args));
+
+        args->rq_oscc  = oscc;
+        args->rq_lsm   = *ea;
+        args->rq_oinfo = oinfo;
+
+        spin_lock(&oscc->oscc_lock);
+        /* try fast path */
+        rc = handle_async_create(fake_req, 0);
+        if (rc == -EAGAIN) {
+                int is_add;
+                /* we not have objects - try wait */
+                is_add = ptlrpcd_add_req(fake_req, PSCOPE_OTHER);
+                if (!is_add)
+                        list_add(&fake_req->rq_list,
+                                 &oscc->oscc_wait_create_list);
+                else
+                        rc = is_add;
+        }
+        spin_unlock(&oscc->oscc_lock);
+
+        if (rc != -EAGAIN)
+                /* need free request if was error hit or
+                 * objects already allocated */
+                ptlrpc_req_finished(fake_req);
+        else
+                /* EAGAIN mean - request is delayed */
+                rc = 0;
+
+        RETURN(rc);
 }
 
 int osc_create(struct obd_export *exp, struct obdo *oa,
@@ -327,7 +509,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
         struct obd_import  *imp  = exp->exp_obd->u.cli.cl_import;
         struct lov_stripe_md *lsm;
-        int try_again = 1, rc = 0;
+        int rc = 0;
         ENTRY;
 
         LASSERT(oa);
@@ -355,6 +537,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         spin_unlock(&oscc->oscc_lock);
                         RETURN(0);
                 }
+
                 oscc->oscc_flags |= OSCC_FLAG_SYNC_IN_PROGRESS;
                 /* seting flag LOW we prevent extra grow precreate size
                  * and enforce use last assigned size */
@@ -392,15 +575,16 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         CDEBUG(D_HA, "%s: oscc recovery finished, last_id: "
                                LPU64", rc: %d\n", oscc->oscc_obd->obd_name,
                                oscc->oscc_last_id, rc);
-                        cfs_waitq_signal(&oscc->oscc_waitq);
                 } else {
                         CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n",
                                oscc->oscc_obd->obd_name, rc);
                 }
-                spin_unlock(&oscc->oscc_lock);
 
+                cfs_waitq_signal(&oscc->oscc_waitq);
+                spin_unlock(&oscc->oscc_lock);
 
-                RETURN(rc);
+                if (rc < 0)
+                        RETURN(rc);
         }
 
         lsm = *ea;
@@ -410,27 +594,16 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         RETURN(rc);
         }
 
-        while (try_again) {
-                /* If orphans are being recovered, then we must wait until
-                   it is finished before we can continue with create. */
-                if (oscc_recovering(oscc)) {
-                        struct l_wait_info lwi;
-
+        while (1) {
+                if (oscc_in_sync(oscc))
                         CDEBUG(D_HA,"%s: oscc recovery in progress, waiting\n",
                                oscc->oscc_obd->obd_name);
 
-                        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(
-                                obd_timeout / 4)), NULL, NULL);
-                        rc = l_wait_event(oscc->oscc_waitq,
-                                          !oscc_recovering(oscc), &lwi);
-                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                        if (rc == -ETIMEDOUT) {
-                                CDEBUG(D_HA,"%s: timeout waiting on recovery\n",
-                                       oscc->oscc_obd->obd_name);
-                                RETURN(rc);
-                        }
-                        CDEBUG(D_HA, "%s: oscc recovery over, waking up\n",
-                               oscc->oscc_obd->obd_name);
+                rc = oscc_precreate(oscc);
+                if (rc) {
+                        CDEBUG(D_HA,"%s: error create %d\n",
+                               oscc->oscc_obd->obd_name, rc);
+                        break;
                 }
 
                 spin_lock(&oscc->oscc_lock);
@@ -438,26 +611,31 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         spin_unlock(&oscc->oscc_lock);
                         break;
                 }
+                /* wakeup but recovery not finished */
+                if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+                        rc = -EIO;
+                        spin_unlock(&oscc->oscc_lock);
+                        break;
+                }
 
-                if (oscc->oscc_last_id >= oscc->oscc_next_id) {
+                if (oscc_has_objects_nolock(oscc, 1)) {
                         memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
                         oa->o_id = oscc->oscc_next_id;
                         lsm->lsm_object_id = oscc->oscc_next_id;
                         *ea = lsm;
                         oscc->oscc_next_id++;
-                        try_again = 0;
+                        spin_unlock(&oscc->oscc_lock);
 
                         CDEBUG(D_RPCTRACE, "%s: set oscc_next_id = "LPU64"\n",
                                exp->exp_obd->obd_name, oscc->oscc_next_id);
+                        break;
                 } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
                         rc = -ENOSPC;
                         spin_unlock(&oscc->oscc_lock);
                         break;
                 }
+
                 spin_unlock(&oscc->oscc_lock);
-                rc = oscc_precreate(oscc, try_again);
-                if (rc)
-                        break;
         }
 
         if (rc == 0)
@@ -478,7 +656,7 @@ void oscc_init(struct obd_device *obd)
         oscc = &obd->u.cli.cl_oscc;
 
         memset(oscc, 0, sizeof(*oscc));
-        CFS_INIT_LIST_HEAD(&oscc->oscc_list);
+
         cfs_waitq_init(&oscc->oscc_waitq);
         spin_lock_init(&oscc->oscc_lock);
         oscc->oscc_obd = obd;
@@ -488,6 +666,21 @@ void oscc_init(struct obd_device *obd)
         oscc->oscc_next_id = 2;
         oscc->oscc_last_id = 1;
         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+
+        CFS_INIT_LIST_HEAD(&oscc->oscc_wait_create_list);
+
         /* XXX the export handle should give the oscc the last object */
         /* oed->oed_oscc.oscc_last_id = exph->....; */
 }
+
+void oscc_fini(struct obd_device *obd)
+{
+        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
+        ENTRY;
+
+
+        spin_lock(&oscc->oscc_lock);
+        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
+        oscc->oscc_flags |= OSCC_FLAG_EXITING;
+        spin_unlock(&oscc->oscc_lock);
+}
index d3d55fc..1476019 100644 (file)
@@ -105,6 +105,8 @@ struct osc_cache_waiter {
 int osc_precreate(struct obd_export *exp);
 int osc_create(struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md **ea, struct obd_trans_info *oti);
+int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
+                     struct lov_stripe_md **ea, struct obd_trans_info *oti);
 int osc_real_create(struct obd_export *exp, struct obdo *oa,
                     struct lov_stripe_md **ea, struct obd_trans_info *oti);
 void oscc_init(struct obd_device *obd);
index e471690..31cb35e 100644 (file)
@@ -4256,18 +4256,12 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 
 int osc_cleanup(struct obd_device *obd)
 {
-        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
         int rc;
 
         ENTRY;
         ptlrpc_lprocfs_unregister_obd(obd);
         lprocfs_obd_cleanup(obd);
 
-        spin_lock(&oscc->oscc_lock);
-        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
-        oscc->oscc_flags |= OSCC_FLAG_EXITING;
-        spin_unlock(&oscc->oscc_lock);
-
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
@@ -4317,6 +4311,7 @@ struct obd_ops osc_obd_ops = {
         .o_unpackmd             = osc_unpackmd,
         .o_precreate            = osc_precreate,
         .o_create               = osc_create,
+        .o_create_async         = osc_create_async,
         .o_destroy              = osc_destroy,
         .o_getattr              = osc_getattr,
         .o_getattr_async        = osc_getattr_async,
index 7809aa3..2796d9f 100644 (file)
@@ -218,6 +218,7 @@ static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
                                  sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         oti->oti_logcookies = &repbody->oa.o_lcookie;
+        
         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
         RETURN(0);
index aaad618..6571c84 100644 (file)
@@ -683,6 +683,64 @@ ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,
                                     NULL);
 }
 
+struct ptlrpc_request *ptlrpc_prep_fakereq(struct obd_import *imp,
+                                           unsigned int timeout,
+                                           ptlrpc_interpterer_t interpreter)
+{
+        struct ptlrpc_request *request = NULL;
+        ENTRY;
+
+        OBD_ALLOC(request, sizeof(*request));
+        if (!request) {
+                CERROR("request allocation out of memory\n");
+                RETURN(NULL);
+        }
+
+        request->rq_send_state = LUSTRE_IMP_FULL;
+        request->rq_type = PTL_RPC_MSG_REQUEST;
+        request->rq_import = class_import_get(imp);
+        request->rq_export = NULL;
+
+        request->rq_sent = cfs_time_current_sec();
+        request->rq_reply_deadline = request->rq_sent + timeout;
+        request->rq_interpret_reply = interpreter;
+        request->rq_phase = RQ_PHASE_RPC;
+        request->rq_next_phase = RQ_PHASE_INTERPRET;
+        /* don't want reply */
+        request->rq_receiving_reply = 0;
+        request->rq_must_unlink = 0;
+        request->rq_no_delay = request->rq_no_resend = 1;
+        request->rq_fake = 1;
+
+        spin_lock_init(&request->rq_lock);
+        CFS_INIT_LIST_HEAD(&request->rq_list);
+        CFS_INIT_LIST_HEAD(&request->rq_replay_list);
+        CFS_INIT_LIST_HEAD(&request->rq_set_chain);
+        CFS_INIT_LIST_HEAD(&request->rq_history_list);
+        CFS_INIT_LIST_HEAD(&request->rq_exp_list);
+        cfs_waitq_init(&request->rq_reply_waitq);
+
+        request->rq_xid = ptlrpc_next_xid();
+        atomic_set(&request->rq_refcount, 1);
+
+        RETURN(request);
+}
+
+void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
+{
+        /* if we kill request before timeout - need adjust counter */
+        if (req->rq_phase == RQ_PHASE_RPC) {
+                struct ptlrpc_request_set *set = req->rq_set;
+
+                if (set)
+                        set->set_remaining --;
+        }
+
+        ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
+        list_del_init(&req->rq_list);
+}
+
+
 struct ptlrpc_request_set *ptlrpc_prep_set(void)
 {
         struct ptlrpc_request_set *set;
@@ -721,7 +779,8 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 n++;
         }
 
-        LASSERT(set->set_remaining == 0 || set->set_remaining == n);
+        LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n",
+                 set->set_remaining, n);
 
         list_for_each_safe(tmp, next, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -731,17 +790,7 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 LASSERT(req->rq_phase == expected_phase);
 
                 if (req->rq_phase == RQ_PHASE_NEW) {
-
-                        if (req->rq_interpret_reply != NULL) {
-                                ptlrpc_interpterer_t interpreter =
-                                        req->rq_interpret_reply;
-
-                                /* higher level (i.e. LOV) failed;
-                                 * let the sub reqs clean up */
-                                req->rq_status = -EBADR;
-                                interpreter(NULL, req, &req->rq_async_args,
-                                            req->rq_status);
-                        }
+                        ptlrpc_req_interpret(NULL, req, -EBADR);
                         set->set_remaining--;
                 }
 
@@ -1466,21 +1515,17 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                  * finished. */
                 LASSERT(!req->rq_receiving_reply);
 
-                if (req->rq_interpret_reply != NULL) {
-                        ptlrpc_interpterer_t interpreter =
-                                req->rq_interpret_reply;
-                        req->rq_status = interpreter(env, req,
-                                                     &req->rq_async_args,
-                                                     req->rq_status);
-                }
+                ptlrpc_req_interpret(env, req, req->rq_status);
+
                 ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
 
                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
                        "opc %s:%s:%d:"LPU64":%s:%d\n", cfs_curproc_comm(),
                        imp->imp_obd->obd_uuid.uuid,
-                       lustre_msg_get_status(req->rq_reqmsg), req->rq_xid,
+                       req->rq_reqmsg ? lustre_msg_get_status(req->rq_reqmsg):-1,
+                       req->rq_xid,
                        libcfs_nid2str(imp->imp_connection->c_peer.nid),
-                       lustre_msg_get_opc(req->rq_reqmsg));
+                       req->rq_reqmsg ? lustre_msg_get_opc(req->rq_reqmsg) : -1);
 
                 spin_lock(&imp->imp_lock);
                 /* Request already may be not on sending or delaying list. This
@@ -1534,6 +1579,9 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
                 RETURN(1);
         }
 
+        if (req->rq_fake)
+               RETURN(1);
+
         atomic_inc(&imp->imp_timeouts);
 
         /* The DLM server doesn't want recovery run on its imports. */
@@ -1774,7 +1822,7 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         LASSERTF(list_empty(&request->rq_set_chain), "req %p\n", request);
         LASSERTF(list_empty(&request->rq_exp_list), "req %p\n", request);
         LASSERTF(!request->rq_replay, "req %p\n", request);
-        LASSERT(request->rq_cli_ctx);
+        LASSERT(request->rq_cli_ctx || request->rq_fake);
 
         req_capsule_fini(&request->rq_pill);
 
@@ -1811,7 +1859,8 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
         if (request->rq_reqbuf != NULL || request->rq_clrbuf != NULL)
                 sptlrpc_cli_free_reqbuf(request);
 
-        sptlrpc_req_put_ctx(request, !locked);
+        if (request->rq_cli_ctx)
+                sptlrpc_req_put_ctx(request, !locked);
 
         if (request->rq_pool)
                 __ptlrpc_free_req_to_pool(request);
index 92deb8b..cd8ca0e 100644 (file)
@@ -174,6 +174,8 @@ EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
 EXPORT_SYMBOL(ptlrpc_init_rq_pool);
 EXPORT_SYMBOL(ptlrpc_free_rq_pool);
 EXPORT_SYMBOL(ptlrpc_prep_req_pool);
+EXPORT_SYMBOL(ptlrpc_prep_fakereq);
+EXPORT_SYMBOL(ptlrpc_fakereq_finished);
 EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
 EXPORT_SYMBOL(ptlrpc_request_alloc);
 EXPORT_SYMBOL(ptlrpc_request_alloc_pool);
index 2f21170..8338231 100644 (file)
@@ -128,7 +128,7 @@ EXPORT_SYMBOL(ptlrpcd_add_rqset);
  * Requests that are added to the ptlrpcd queue are sent via
  * ptlrpcd_check->ptlrpc_check_set().
  */
-void ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
+int ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
 {
         struct ptlrpcd_ctl *pc;
         enum pscope_thread  pt;
@@ -153,12 +153,12 @@ void ptlrpcd_add_req(struct ptlrpc_request *req, enum ptlrpcd_scope scope)
                  * so that higher levels might free assosiated
                  * resources.
                  */
-                req->rq_status = -EBADR;
-                interpreter(NULL, req, &req->rq_async_args,
-                            req->rq_status);
+                ptlrpc_req_interpret(NULL, req, -EBADR);
                 req->rq_set = NULL;
                 ptlrpc_req_finished(req);
         }
+
+        return rc;
 }
 
 static int ptlrpcd_check(const struct lu_env *env, struct ptlrpcd_ctl *pc)