Whamcloud - gitweb
Add async create for avoid extra blocking with holding ldlm lock.
authorshadow <shadow>
Fri, 15 May 2009 08:56:47 +0000 (08:56 +0000)
committershadow <shadow>
Fri, 15 May 2009 08:56:47 +0000 (08:56 +0000)
Branch b1_8
b=18357
i=johann
i=zhanghc

18 files changed:
lustre/ChangeLog
lustre/include/lustre_export.h
lustre/include/lustre_net.h
lustre/include/obd.h
lustre/include/obd_class.h
lustre/ldlm/ldlm_lib.c
lustre/lov/lov_internal.h
lustre/lov/lov_obd.c
lustre/lov/lov_qos.c
lustre/lov/lov_request.c
lustre/obdclass/lprocfs_status.c
lustre/osc/osc_create.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/ptlrpcd.c

index 53dc3f9..8a7c603 100644 (file)
@@ -14,7 +14,10 @@ tbd Sun Microsystems, Inc.
          of Lustre filesystem with 4K stack may cause a stack overflow. For
          more information, please refer to bugzilla 17630.
 
-
+Severity   : enhancement
+Bugzilla   : 18357
+Description: implement async create (obd_async_create) method for osc, to avoid
+             too long waiting new ost objects with holding ldlm lock.
 
 ------------------------------------------------------------------------------
 tbd Sun Microsystems, Inc.
index a49a6a3..0e333a8 100644 (file)
@@ -63,7 +63,7 @@ struct mds_export_data {
 
 struct osc_creator {
         spinlock_t              oscc_lock;
-        struct list_head        oscc_list;
+        struct list_head        oscc_wait_create_list;
         struct obd_device       *oscc_obd;
         obd_id                  oscc_last_id;//last available pre-created object
         obd_id                  oscc_next_id;// what object id to give out next
index 86dec19..90fdb85 100644 (file)
@@ -395,12 +395,25 @@ struct ptlrpc_request {
         /* Multi-rpc bits */
         struct list_head rq_set_chain;
         struct ptlrpc_request_set *rq_set;
-        void *rq_interpret_reply;               /* Async completion handler */
+        int (*rq_interpret_reply)(struct ptlrpc_request *req, void *data,
+                                  int rc); /* async interpret handler */
         union ptlrpc_async_args rq_async_args;  /* Async completion context */
         struct ptlrpc_request_pool *rq_pool;    /* Pool if request from
                                                    preallocated list */
 };
 
+static inline int ptlrpc_req_interpret(struct ptlrpc_request *req, int rc)
+{
+        if (req->rq_interpret_reply != NULL) {
+                int (*interpreter)(struct ptlrpc_request *, void *, int) =
+                     req->rq_interpret_reply;
+
+                req->rq_status = interpreter(req, &req->rq_async_args, rc);
+                return req->rq_status;
+        }
+        return rc;
+}
+
 static inline void lustre_set_req_swabbed(struct ptlrpc_request *req, int index)
 {
         LASSERT(index < sizeof(req->rq_req_swab_mask) * 8);
@@ -839,6 +852,11 @@ ptlrpc_init_rq_pool(int, int,
                     void (*populate_pool)(struct ptlrpc_request_pool *, int));
 
 void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req);
+struct ptlrpc_request *ptlrpc_prep_fakereq(unsigned int timeout,
+                                           int (*interpreter)(struct ptlrpc_request *,
+                                                              void *, int));
+void ptlrpc_fakereq_finished(struct ptlrpc_request *req);
+
 struct ptlrpc_request *ptlrpc_prep_req(struct obd_import *imp, __u32 version,
                                        int opcode, int count, __u32 *lengths,
                                        char **bufs);
@@ -1140,7 +1158,7 @@ void ping_evictor_stop(void);
 int ptlrpcd_start(char *name, struct ptlrpcd_ctl *pc);
 void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force);
 void ptlrpcd_wake(struct ptlrpc_request *req);
-void ptlrpcd_add_req(struct ptlrpc_request *req);
+int ptlrpcd_add_req(struct ptlrpc_request *req);
 void ptlrpcd_add_rqset(struct ptlrpc_request_set *set);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
index 19b555b..0c17b50 100644 (file)
@@ -1119,6 +1119,9 @@ struct obd_ops {
         int (*o_precreate)(struct obd_export *exp);
         int (*o_create)(struct obd_export *exp,  struct obdo *oa,
                         struct lov_stripe_md **ea, struct obd_trans_info *oti);
+        int (*o_create_async)(struct obd_export *exp,  struct obd_info *oinfo,
+                              struct lov_stripe_md **ea,
+                              struct obd_trans_info *oti);
         int (*o_destroy)(struct obd_export *exp, struct obdo *oa,
                          struct lov_stripe_md *ea, struct obd_trans_info *oti,
                          struct obd_export *md_exp);
index 0517955..d2a4aa4 100644 (file)
@@ -533,6 +533,21 @@ static inline int obd_precreate(struct obd_export *exp)
         RETURN(rc);
 }
 
+static inline int obd_create_async(struct obd_export *exp,
+                                   struct obd_info *oinfo,
+                                   struct lov_stripe_md **ea,
+                                   struct obd_trans_info *oti)
+{
+        int rc;
+        ENTRY;
+
+        EXP_CHECK_OP(exp, create_async);
+        EXP_COUNTER_INCREMENT(exp, create_async);
+
+        rc = OBP(exp->exp_obd, create_async)(exp, oinfo, ea, oti);
+        RETURN(rc);
+}
+
 static inline int obd_create(struct obd_export *exp, struct obdo *obdo,
                              struct lov_stripe_md **ea,
                              struct obd_trans_info *oti)
index 57f3f2e..45e063e 100644 (file)
@@ -382,7 +382,7 @@ int client_connect_import(struct lustre_handle *dlm_handle,
         down_write(&cli->cl_sem);
         if (cli->cl_conn_count > 0)
                 GOTO(out_sem, rc = -EALREADY);
-        
+
         rc = class_connect(dlm_handle, obd, cluuid);
         if (rc)
                 GOTO(out_sem, rc);
index c6da00c..825f44f 100644 (file)
@@ -81,8 +81,10 @@ struct lov_request_set {
         struct brw_page         *set_pga;
         struct lov_lock_handles *set_lockh;
         struct list_head         set_list;
+        cfs_waitq_t              set_waitq;
 };
 
+
 #define LOV_AP_MAGIC 8200
 
 struct lov_async_page {
@@ -187,6 +189,7 @@ int qos_remedy_create(struct lov_request_set *set, struct lov_request *req);
 
 /* lov_request.c */
 void lov_set_add_req(struct lov_request *req, struct lov_request_set *set);
+int lov_finished_set(struct lov_request_set *set);
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc);
 int lov_update_common_set(struct lov_request_set *set,
@@ -195,6 +198,7 @@ int lov_prep_create_set(struct obd_export *exp, struct obd_info *oifo,
                         struct lov_stripe_md **ea, struct obdo *src_oa,
                         struct obd_trans_info *oti,
                         struct lov_request_set **reqset);
+int cb_create_update(struct obd_info *oinfo, int rc);
 int lov_update_create_set(struct lov_request_set *set,
                           struct lov_request *req, int rc);
 int lov_fini_create_set(struct lov_request_set *set, struct lov_stripe_md **ea);
index 4ee1021..7197b33 100644 (file)
@@ -1177,6 +1177,7 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
         struct obd_info oinfo;
         struct lov_request_set *set = NULL;
         struct lov_request *req;
+        struct l_wait_info  lwi = { 0 };
         int rc = 0;
         ENTRY;
 
@@ -1214,10 +1215,18 @@ static int lov_create(struct obd_export *exp, struct obdo *src_oa,
 
         list_for_each_entry(req, &set->set_list, rq_link) {
                 /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
-                rc = obd_create(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                                req->rq_oi.oi_oa, &req->rq_oi.oi_md, oti);
-                lov_update_create_set(set, req, rc);
+                rc = obd_create_async(lov->lov_tgts[req->rq_idx]->ltd_exp,
+                                      &req->rq_oi, &req->rq_oi.oi_md, oti);
         }
+
+        /* osc_create have timeout equ obd_timeout/2 so waiting don't be
+         * longer then this */
+        l_wait_event(set->set_waitq, lov_finished_set(set), &lwi);
+
+        /* we not have ptlrpc set for assign set->interpret and should
+         * be call interpret function himself. calling from cb_create_update
+         * not permited because lov_fini_create_set can sleep for long time,
+         * but we must avoid sleeping in ptlrpcd interpret function. */
         rc = lov_fini_create_set(set, ea);
 out:
         obd_putref(exp->exp_obd);
index fe76e91..71323a0 100644 (file)
@@ -1042,6 +1042,7 @@ int qos_prep_create(struct obd_export *exp, struct lov_request_set *set)
                 req->rq_stripe = i;
                 /* create data objects with "parent" OA */
                 memcpy(req->rq_oi.oi_oa, src_oa, sizeof(*req->rq_oi.oi_oa));
+                req->rq_oi.oi_cb_up = cb_create_update;
 
                 /* XXX When we start creating objects on demand, we need to
                  *     make sure that we always create the object on the
index 83e7238..bc0bc4b 100644 (file)
@@ -59,6 +59,7 @@ static void lov_init_set(struct lov_request_set *set)
         set->set_cookies = 0;
         CFS_INIT_LIST_HEAD(&set->set_list);
         atomic_set(&set->set_refcount, 1);
+        cfs_waitq_init(&set->set_waitq);
 }
 
 static void lov_finish_set(struct lov_request_set *set)
@@ -93,6 +94,14 @@ static void lov_finish_set(struct lov_request_set *set)
         EXIT;
 }
 
+int lov_finished_set(struct lov_request_set *set)
+{
+        CDEBUG(D_INFO, "check set %d/%d\n", set->set_completes,
+               set->set_count);
+        return set->set_completes == set->set_count;
+}
+
+
 void lov_update_set(struct lov_request_set *set,
                     struct lov_request *req, int rc)
 {
@@ -102,6 +111,8 @@ void lov_update_set(struct lov_request_set *set,
         set->set_completes++;
         if (rc == 0)
                 set->set_success++;
+
+        cfs_waitq_signal(&set->set_waitq);
 }
 
 int lov_update_common_set(struct lov_request_set *set,
@@ -125,6 +136,7 @@ void lov_set_add_req(struct lov_request *req, struct lov_request_set *set)
 {
         list_add_tail(&req->rq_link, &set->set_list);
         set->set_count++;
+        req->rq_rqset = set;
 }
 
 int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc)
@@ -330,8 +342,6 @@ int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,
                         ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +
                         sizeof(struct lov_oinfo *);
 
-
-                req->rq_rqset = set;
                 /* Set lov request specific parameters. */
                 req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;
                 req->rq_oi.oi_cb_up = cb_update_enqueue;
@@ -715,6 +725,15 @@ int lov_update_create_set(struct lov_request_set *set,
         RETURN(0);
 }
 
+int cb_create_update(struct obd_info *oinfo, int rc)
+{
+        struct lov_request *lovreq;
+
+        lovreq = container_of(oinfo, struct lov_request, rq_oi);
+        return lov_update_create_set(lovreq->rq_rqset, lovreq, rc);
+}
+
+
 int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,
                         struct lov_stripe_md **lsmp, struct obdo *src_oa,
                         struct obd_trans_info *oti,
@@ -1019,7 +1038,6 @@ int lov_prep_getattr_set(struct obd_export *exp, struct obd_info *oinfo,
                        sizeof(*req->rq_oi.oi_oa));
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
                 req->rq_oi.oi_cb_up = cb_getattr_update;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
@@ -1211,7 +1229,6 @@ int lov_prep_setattr_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_setattr_update;
-                req->rq_rqset = set;
 
                 if (oinfo->oi_oa->o_valid & OBD_MD_FLSIZE) {
                         int off = lov_stripe_offset(oinfo->oi_md,
@@ -1345,7 +1362,6 @@ int lov_prep_punch_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_oi.oi_oa->o_id = loi->loi_id;
                 req->rq_oi.oi_oa->o_stripe_idx = i;
                 req->rq_oi.oi_cb_up = cb_update_punch;
-                req->rq_rqset = set;
 
                 req->rq_oi.oi_policy.l_extent.start = rs;
                 req->rq_oi.oi_policy.l_extent.end = re;
@@ -1446,7 +1462,6 @@ int lov_prep_sync_set(struct obd_export *exp, struct obd_info *oinfo,
                 req->rq_oi.oi_policy.l_extent.end = re;
                 req->rq_oi.oi_policy.l_extent.gid = -1;
                 req->rq_oi.oi_cb_up = cb_sync_update;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
@@ -1617,7 +1632,7 @@ static int cb_statfs_update(struct obd_info *oinfo, int rc)
         qos_update(lov);
 out:
         if (lovreq->rq_rqset->set_oi->oi_flags & OBD_STATFS_PTLRPCD &&
-            lovreq->rq_rqset->set_count == lovreq->rq_rqset->set_completes) {
+            lov_finished_set(lovreq->rq_rqset)) {
                lov_statfs_interpret(NULL, lovreq->rq_rqset,
                                     lovreq->rq_rqset->set_success !=
                                                   lovreq->rq_rqset->set_count);
@@ -1666,7 +1681,6 @@ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
                 req->rq_idx = i;
                 req->rq_oi.oi_cb_up = cb_statfs_update;
                 req->rq_oi.oi_flags = oinfo->oi_flags;
-                req->rq_rqset = set;
 
                 lov_set_add_req(req, set);
         }
index 59f981d..091fcfc 100644 (file)
@@ -1345,6 +1345,7 @@ void lprocfs_init_ops_stats(int num_private_stats, struct lprocfs_stats *stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, preallocate);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, precreate);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, create);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, create_async);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, destroy);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, setattr);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, setattr_async);
index af45e6c..fcbc00b 100644 (file)
 
 # include <lustre_dlm.h>
 #include <obd_class.h>
+#include <obd.h>
 #include "osc_internal.h"
 
+/* XXX need AT adjust ? */
+#define osc_create_timeout      (obd_timeout / 2)
+
+struct osc_create_async_args {
+        struct osc_creator      *rq_oscc;
+        struct lov_stripe_md    *rq_lsm;
+        struct obd_info         *rq_oinfo;
+};
+
+static int oscc_internal_create(struct osc_creator *oscc);
+static int handle_async_create(struct ptlrpc_request *req, int rc);
+
 static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
 {
         struct osc_creator *oscc;
         struct ost_body *body = NULL;
+        struct ptlrpc_request *fake_req, *pos;
         ENTRY;
 
         if (req->rq_repmsg) {
@@ -109,31 +123,34 @@ static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
                 spin_unlock(&oscc->oscc_lock);
                 break;
         }
-        case -EAGAIN:
-                /* valid race delorphan vs create, or somthing after resend */
-                spin_unlock(&oscc->oscc_lock);
-                DEBUG_REQ(D_INODE, req, "Got EGAIN - resend \n");
-                break;
         case -ENOSPC:
-        case -EROFS: 
+        case -EROFS:
         case -EFBIG: {
                 oscc->oscc_flags |= OSCC_FLAG_NOSPC;
                 if (body && rc == -ENOSPC) {
-                        oscc->oscc_grow_count = OST_MIN_PRECREATE;
                         oscc->oscc_last_id = body->oa.o_id;
+                        oscc->oscc_grow_count = OST_MIN_PRECREATE;
                 }
                 spin_unlock(&oscc->oscc_lock);
                 DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
                 break;
         }
         case -EIO: {
-                /* filter always set body->oa.o_id as the last_id 
+                /* filter always set body->oa.o_id as the last_id
                  * of filter (see filter_handle_precreate for detail)*/
                 if (body && body->oa.o_id > oscc->oscc_last_id)
                         oscc->oscc_last_id = body->oa.o_id;
                 spin_unlock(&oscc->oscc_lock);
                 break;
         }
+        case -EWOULDBLOCK: {
+                /* aka EAGAIN we should not delay create if import failed -
+                 * this avoid client stick in create and avoid race with delorphan */
+                oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+                /* oscc->oscc_grow_count = OST_MIN_PRECREATE; */
+                spin_unlock(&oscc->oscc_lock);
+                break;
+        }
         default: {
                 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
                 oscc->oscc_grow_count = OST_MIN_PRECREATE;
@@ -148,6 +165,19 @@ static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64", next to use "LPU64"\n",
                oscc->oscc_last_id, oscc->oscc_next_id);
 
+        spin_lock(&oscc->oscc_lock);
+        list_for_each_entry_safe(fake_req, pos,
+                                 &oscc->oscc_wait_create_list, rq_list) {
+                if (handle_async_create(fake_req, rc)  == -EAGAIN) {
+                        oscc_internal_create(oscc);
+                        /* sending request should be never fail because
+                         * osc use preallocated requests pool */
+                        GOTO(exit_wakeup, rc);
+                }
+        }
+        spin_unlock(&oscc->oscc_lock);
+
+exit_wakeup:
         cfs_waitq_signal(&oscc->oscc_waitq);
         RETURN(rc);
 }
@@ -161,12 +191,13 @@ static int oscc_internal_create(struct osc_creator *oscc)
 
         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
 
-        if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
-            oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+        if(oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
                 spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
 
+        /* we need check it before OSCC_FLAG_CREATING - because need
+         * see lower number of precreate objects */
         if (oscc->oscc_grow_count < oscc->oscc_max_grow_count &&
             ((oscc->oscc_flags & OSCC_FLAG_LOW) == 0) &&
             (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
@@ -175,6 +206,11 @@ static int oscc_internal_create(struct osc_creator *oscc)
                 oscc->oscc_grow_count *= 2;
         }
 
+        if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
+                spin_unlock(&oscc->oscc_lock);
+                RETURN(0);
+        }
+
         if (oscc->oscc_grow_count > oscc->oscc_max_grow_count / 2)
                 oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2;
 
@@ -204,6 +240,9 @@ static int oscc_internal_create(struct osc_creator *oscc)
         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
                body->oa.o_id, oscc->oscc_last_id);
 
+        /* we should not resend create request - anyway we will have delorphan
+         * and kill these objects */
+        request->rq_no_delay = request->rq_no_resend = 1;
         ptlrpc_req_set_repsize(request, 2, size);
 
         request->rq_async_args.pointer_arg[0] = oscc;
@@ -213,17 +252,19 @@ static int oscc_internal_create(struct osc_creator *oscc)
         RETURN(0);
 }
 
+static int oscc_has_objects_nolock(struct osc_creator *oscc, int count)
+{
+        return ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
+}
+
+
 static int oscc_has_objects(struct osc_creator *oscc, int count)
 {
         int have_objs;
-        spin_lock(&oscc->oscc_lock);
-        have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
 
-        if (!have_objs) {
-                oscc_internal_create(oscc);
-        } else {
-                spin_unlock(&oscc->oscc_lock);
-        }
+        spin_lock(&oscc->oscc_lock);
+        have_objs = oscc_has_objects_nolock(oscc, count);
+        spin_unlock(&oscc->oscc_lock);
 
         return have_objs;
 }
@@ -234,33 +275,39 @@ static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
         int ost_full;
         int osc_invalid;
 
-        have_objs = oscc_has_objects(oscc, count);
+        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
 
         spin_lock(&oscc->oscc_lock);
         ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
-        spin_unlock(&oscc->oscc_lock);
+        have_objs = oscc_has_objects_nolock(oscc, count);
+        osc_invalid |= oscc->oscc_flags & OSCC_FLAG_EXITING;
 
-        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
+        if (!ost_full || !osc_invalid)
+                /* they release lock himself */
+                oscc_internal_create(oscc);
+        else
+                spin_unlock(&oscc->oscc_lock);
 
         return have_objs || ost_full || osc_invalid;
 }
 
-static int oscc_precreate(struct osc_creator *oscc, int wait)
+static int oscc_precreate(struct osc_creator *oscc)
 {
-        struct l_wait_info lwi = { 0 };
+        struct l_wait_info lwi;
         int rc = 0;
         ENTRY;
 
         if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
                 RETURN(0);
 
-        if (!wait)
-                RETURN(0);
+        /* we should be not block forever - because client's create rpc can
+         * stick in mds for long time and forbid client reconnect */
+        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(osc_create_timeout)),
+                          NULL, NULL);
 
-        /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */
-        l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
+        rc = l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
 
-        if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
+        if (!oscc_has_objects(oscc, 1) || (oscc->oscc_flags & OSCC_FLAG_NOSPC))
                 rc = -ENOSPC;
 
         if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
@@ -269,9 +316,9 @@ static int oscc_precreate(struct osc_creator *oscc, int wait)
         RETURN(rc);
 }
 
-int oscc_recovering(struct osc_creator *oscc)
+static int oscc_recovering(struct osc_creator *oscc)
 {
-        int recov = 0;
+        int recov;
 
         spin_lock(&oscc->oscc_lock);
         recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
@@ -280,6 +327,17 @@ int oscc_recovering(struct osc_creator *oscc)
         return recov;
 }
 
+static int oscc_in_sync(struct osc_creator *oscc)
+{
+        int sync;
+
+        spin_lock(&oscc->oscc_lock);
+        sync = oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS;
+        spin_unlock(&oscc->oscc_lock);
+
+        return sync;
+}
+
 /* decide if the OST has remaining object, return value :
         0 : the OST has remaining object, and don't need to do precreate.
         1 : the OST has no remaining object, and will send a RPC for precreate.
@@ -297,35 +355,157 @@ int osc_precreate(struct obd_export *exp)
         if (imp != NULL && imp->imp_deactive)
                 RETURN(1000);
 
+        /* until oscc in recovery - other flags is wrong */
         if (oscc_recovering(oscc))
                 RETURN(2);
 
         if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
                 RETURN(1000);
 
-        if (oscc->oscc_last_id < oscc->oscc_next_id) {
-                if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS)
-                        RETURN(1);
-
-                spin_lock(&oscc->oscc_lock);
-                if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
-                        spin_unlock(&oscc->oscc_lock);
-                        RETURN(1);
-                }
+        if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
+                RETURN(0);
 
-                oscc_internal_create(oscc);
+        spin_lock(&oscc->oscc_lock);
+        if ((oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) ||
+            (oscc->oscc_flags & OSCC_FLAG_CREATING)) {
+                spin_unlock(&oscc->oscc_lock);
                 RETURN(1);
         }
-        RETURN(0);
+
+        oscc_internal_create(oscc);
+        RETURN(1);
+}
+
+static int handle_async_create(struct ptlrpc_request *req, int rc)
+{
+        struct osc_create_async_args *args = ptlrpc_req_async_args(req);
+        struct osc_creator    *oscc = args->rq_oscc;
+        struct lov_stripe_md  *lsm  = args->rq_lsm;
+        struct obd_info       *oinfo = args->rq_oinfo;
+        struct obdo           *oa = oinfo->oi_oa;
+
+        LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
+
+        if(rc)
+                GOTO(out_wake, rc);
+
+        if ((oscc->oscc_flags & OSCC_FLAG_EXITING))
+                GOTO(out_wake, rc = -EIO);
+
+        if (oscc_has_objects_nolock(oscc, 1)) {
+                memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
+                oa->o_id = oscc->oscc_next_id;
+                lsm->lsm_object_id = oscc->oscc_next_id;
+                oscc->oscc_next_id++;
+
+                CDEBUG(D_RPCTRACE, " set oscc_next_id = "LPU64"\n",
+                       oscc->oscc_next_id);
+               GOTO(out_wake, rc = 0);
+        }
+
+        /* should be try wait until recovery finished */
+        if(oscc->oscc_flags & OSCC_FLAG_RECOVERING)
+                RETURN(-EAGAIN);
+
+        if (oscc->oscc_flags & OSCC_FLAG_NOSPC)
+                GOTO(out_wake, rc = -ENOSPC);
+
+        /* we not have objects now - continue wait */
+        RETURN(-EAGAIN);
+
+out_wake:
+
+        rc = oinfo->oi_cb_up(oinfo, rc);
+        ptlrpc_fakereq_finished(req);
+
+        RETURN(rc);
+}
+
+static int async_create_interpret(struct ptlrpc_request *req, void *data, int rc)
+{
+        struct osc_create_async_args *args = ptlrpc_req_async_args(req);
+        struct osc_creator    *oscc = args->rq_oscc;
+        int ret;
+
+        spin_lock(&oscc->oscc_lock);
+        ret = handle_async_create(req, rc);
+        spin_unlock(&oscc->oscc_lock);
+
+        return ret;
 }
 
+int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
+                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
+{
+        int rc;
+        struct ptlrpc_request *fake_req;
+        struct osc_create_async_args *args;
+        struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
+        struct obdo *oa = oinfo->oi_oa;
+        ENTRY;
+
+        if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0)){
+                rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
+                rc = oinfo->oi_cb_up(oinfo, rc);
+                RETURN(rc);
+        }
+
+        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+            oa->o_flags == OBD_FL_RECREATE_OBJS) {
+                rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
+                rc = oinfo->oi_cb_up(oinfo, rc);
+                RETURN(rc);
+        }
+
+        LASSERT((*ea) != NULL);
+
+        fake_req = ptlrpc_prep_fakereq(osc_create_timeout, async_create_interpret);
+        if (fake_req == NULL) {
+                rc = oinfo->oi_cb_up(oinfo, -ENOMEM);
+                RETURN(-ENOMEM);
+        }
+
+        args = ptlrpc_req_async_args(fake_req);
+        CLASSERT(sizeof(*args) <= sizeof(fake_req->rq_async_args));
+
+        args->rq_oscc  = oscc;
+        args->rq_lsm   = *ea;
+        args->rq_oinfo = oinfo;
+
+        spin_lock(&oscc->oscc_lock);
+        /* try fast path */
+        rc = handle_async_create(fake_req, 0);
+        if (rc == -EAGAIN) {
+                int is_add;
+                /* we not have objects - try wait */
+                is_add = ptlrpcd_add_req(fake_req);
+                if (!is_add)
+                        list_add(&fake_req->rq_list,
+                                 &oscc->oscc_wait_create_list);
+                else
+                        rc = is_add;
+        }
+        spin_unlock(&oscc->oscc_lock);
+
+        if (rc != -EAGAIN)
+                /* need free request if was error hit or
+                 * objects already allocated */
+                ptlrpc_req_finished(fake_req);
+        else
+                /* EAGAIN mean - request is delayed */
+                rc = 0;
+
+        RETURN(rc);
+}
+
+
 int osc_create(struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
         struct lov_stripe_md *lsm;
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
-        struct obd_import  *imp  = exp->exp_obd->u.cli.cl_import;
-        int try_again = 1, rc = 0;
+        int rc = 0;
+
         ENTRY;
         LASSERT(oa);
         LASSERT(ea);
@@ -350,6 +530,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         spin_unlock(&oscc->oscc_lock);
                         RETURN(0);
                 }
+
                 oscc->oscc_flags |= OSCC_FLAG_SYNC_IN_PROGRESS;
                 /* seting flag LOW we prevent extra grow precreate size
                  * and enforce use last assigned size */
@@ -369,11 +550,13 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS;
                 if (rc == 0 || rc == -ENOSPC) {
                         struct obd_connect_data *ocd;
+                       struct obd_import *imp = oscc->oscc_obd->u.cli.cl_import;
 
                         if (rc == -ENOSPC)
                                 oscc->oscc_flags |= OSCC_FLAG_NOSPC;
                         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
                         oscc->oscc_last_id = oa->o_id;
+
                         ocd = &imp->imp_connect_data;
                         if (ocd->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
                                 CDEBUG(D_HA, "%s: Skip orphan set, reset last "
@@ -384,15 +567,16 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         CDEBUG(D_HA, "%s: oscc recovery finished, last_id: "
                                LPU64", rc: %d\n", oscc->oscc_obd->obd_name,
                                oscc->oscc_last_id, rc);
-                        cfs_waitq_signal(&oscc->oscc_waitq);
                 } else {
                         CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n",
                                oscc->oscc_obd->obd_name, rc);
                 }
-                spin_unlock(&oscc->oscc_lock);
 
+                cfs_waitq_signal(&oscc->oscc_waitq);
+                spin_unlock(&oscc->oscc_lock);
 
-                RETURN(rc);
+                if (rc < 0)
+                        RETURN(rc);
         }
 
         lsm = *ea;
@@ -402,27 +586,16 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         RETURN(rc);
         }
 
-        while (try_again) {
-                /* If orphans are being recovered, then we must wait until
-                   it is finished before we can continue with create. */
-                if (oscc_recovering(oscc)) {
-                        struct l_wait_info lwi;
-
+        while (1) {
+                if (oscc_in_sync(oscc))
                         CDEBUG(D_HA,"%s: oscc recovery in progress, waiting\n",
                                oscc->oscc_obd->obd_name);
 
-                        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(
-                                obd_timeout / 4)), NULL, NULL);
-                        rc = l_wait_event(oscc->oscc_waitq,
-                                          !oscc_recovering(oscc), &lwi);
-                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
-                        if (rc == -ETIMEDOUT) {
-                                CDEBUG(D_HA,"%s: timeout waiting on recovery\n",
-                                       oscc->oscc_obd->obd_name);
-                                RETURN(rc);
-                        }
-                        CDEBUG(D_HA, "%s: oscc recovery over, waking up\n",
-                               oscc->oscc_obd->obd_name);
+                rc = oscc_precreate(oscc);
+                if (rc) {
+                        CDEBUG(D_HA,"%s: error create %d\n",
+                               oscc->oscc_obd->obd_name, rc);
+                        break;
                 }
 
                 spin_lock(&oscc->oscc_lock);
@@ -430,26 +603,31 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         spin_unlock(&oscc->oscc_lock);
                         break;
                 }
+                /* wakeup but recovery not finished */
+                if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+                        rc = -EIO;
+                        spin_unlock(&oscc->oscc_lock);
+                        break;
+                }
 
-                if (oscc->oscc_last_id >= oscc->oscc_next_id) {
+                if (oscc_has_objects_nolock(oscc, 1)) {
                         memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
                         oa->o_id = oscc->oscc_next_id;
                         lsm->lsm_object_id = oscc->oscc_next_id;
                         *ea = lsm;
                         oscc->oscc_next_id++;
-                        try_again = 0;
+                        spin_unlock(&oscc->oscc_lock);
 
                         CDEBUG(D_RPCTRACE, "%s: set oscc_next_id = "LPU64"\n",
                                exp->exp_obd->obd_name, oscc->oscc_next_id);
+                        break;
                 } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
                         rc = -ENOSPC;
                         spin_unlock(&oscc->oscc_lock);
                         break;
                 }
+
                 spin_unlock(&oscc->oscc_lock);
-                rc = oscc_precreate(oscc, try_again);
-                if (rc)
-                        break;
         }
 
         if (rc == 0)
@@ -470,7 +648,7 @@ void oscc_init(struct obd_device *obd)
         oscc = &obd->u.cli.cl_oscc;
 
         memset(oscc, 0, sizeof(*oscc));
-        CFS_INIT_LIST_HEAD(&oscc->oscc_list);
+
         cfs_waitq_init(&oscc->oscc_waitq);
         spin_lock_init(&oscc->oscc_lock);
         oscc->oscc_obd = obd;
@@ -480,6 +658,21 @@ void oscc_init(struct obd_device *obd)
         oscc->oscc_next_id = 2;
         oscc->oscc_last_id = 1;
         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+
+        CFS_INIT_LIST_HEAD(&oscc->oscc_wait_create_list);
+
         /* XXX the export handle should give the oscc the last object */
         /* oed->oed_oscc.oscc_last_id = exph->....; */
 }
+
+void oscc_fini(struct obd_device *obd)
+{
+        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
+        ENTRY;
+
+
+        spin_lock(&oscc->oscc_lock);
+        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
+        oscc->oscc_flags |= OSCC_FLAG_EXITING;
+        spin_unlock(&oscc->oscc_lock);
+}
index d5f6f3e..d5c79ef 100644 (file)
@@ -92,6 +92,8 @@ struct osc_cache_waiter {
 int osc_precreate(struct obd_export *exp);
 int osc_create(struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md **ea, struct obd_trans_info *oti);
+int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
+                     struct lov_stripe_md **ea, struct obd_trans_info *oti);
 int osc_real_create(struct obd_export *exp, struct obdo *oa,
                     struct lov_stripe_md **ea, struct obd_trans_info *oti);
 void oscc_init(struct obd_device *obd);
index 97a0f17..4f64d97 100644 (file)
@@ -4340,18 +4340,12 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
 
 int osc_cleanup(struct obd_device *obd)
 {
-        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
         int rc;
 
         ENTRY;
         ptlrpc_lprocfs_unregister_obd(obd);
         lprocfs_obd_cleanup(obd);
 
-        spin_lock(&oscc->oscc_lock);
-        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
-        oscc->oscc_flags |= OSCC_FLAG_EXITING;
-        spin_unlock(&oscc->oscc_lock);
-
         /* free memory of osc quota cache */
         lquota_cleanup(quota_interface, obd);
 
@@ -4441,6 +4435,7 @@ struct obd_ops osc_obd_ops = {
         .o_unpackmd             = osc_unpackmd,
         .o_precreate            = osc_precreate,
         .o_create               = osc_create,
+        .o_create_async         = osc_create_async,
         .o_destroy              = osc_destroy,
         .o_getattr              = osc_getattr,
         .o_getattr_async        = osc_getattr_async,
index 1b21669..5edbd75 100644 (file)
@@ -202,6 +202,7 @@ static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
                                  sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
         oti->oti_logcookies = &repbody->oa.o_lcookie;
+        
         req->rq_status = obd_create(exp, &repbody->oa, NULL, oti);
         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
         RETURN(0);
index 9e77607..207e472 100644 (file)
@@ -617,6 +617,63 @@ ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,
                                     NULL);
 }
 
+struct ptlrpc_request *ptlrpc_prep_fakereq(unsigned int timeout,
+                                           int (*interpreter)(struct ptlrpc_request *,
+                                                              void *, int))
+{
+        struct ptlrpc_request *request = NULL;
+        ENTRY;
+
+        OBD_ALLOC(request, sizeof(*request));
+        if (!request) {
+                CERROR("request allocation out of memory\n");
+                RETURN(NULL);
+        }
+
+        request->rq_send_state = LUSTRE_IMP_FULL;
+        request->rq_type = PTL_RPC_MSG_REQUEST;
+        request->rq_import = NULL;
+        request->rq_export = NULL;
+
+        request->rq_sent = cfs_time_current_sec();
+        request->rq_reply_deadline = request->rq_sent + timeout;
+        request->rq_interpret_reply = interpreter;
+        request->rq_phase = RQ_PHASE_RPC;
+        request->rq_next_phase = RQ_PHASE_INTERPRET;
+        /* don't want reply */
+        request->rq_receiving_reply = 0;
+        request->rq_must_unlink = 0;
+        request->rq_no_delay = request->rq_no_resend = 1;
+
+        spin_lock_init(&request->rq_lock);
+        CFS_INIT_LIST_HEAD(&request->rq_list);
+        CFS_INIT_LIST_HEAD(&request->rq_replay_list);
+        CFS_INIT_LIST_HEAD(&request->rq_set_chain);
+        CFS_INIT_LIST_HEAD(&request->rq_history_list);
+        CFS_INIT_LIST_HEAD(&request->rq_exp_list);
+        cfs_waitq_init(&request->rq_reply_waitq);
+
+        request->rq_xid = ptlrpc_next_xid();
+        atomic_set(&request->rq_refcount, 1);
+
+        RETURN(request);
+}
+
+void ptlrpc_fakereq_finished(struct ptlrpc_request *req)
+{
+        /* if we kill request before timeout - need adjust counter */
+        if (req->rq_phase == RQ_PHASE_RPC) {
+                struct ptlrpc_request_set *set = req->rq_set;
+
+                if (set)
+                        set->set_remaining --;
+        }
+
+        ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
+        list_del_init(&req->rq_list);
+}
+
+
 struct ptlrpc_request_set *ptlrpc_prep_set(void)
 {
         struct ptlrpc_request_set *set;
@@ -655,7 +712,8 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
                 n++;
         }
 
-        LASSERT(set->set_remaining == 0 || set->set_remaining == n);
+        LASSERTF(set->set_remaining == 0 || set->set_remaining == n, "%d / %d\n",
+                 set->set_remaining, n);
 
         list_for_each_safe(tmp, next, &set->set_requests) {
                 struct ptlrpc_request *req =
@@ -1176,7 +1234,6 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                 if (req->rq_err) {
                         if (req->rq_status == 0)
                                 req->rq_status = -EIO;
-                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
 
@@ -1187,7 +1244,6 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                  * interrupted rpcs after they have timed out */
                 if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
                         req->rq_status = -EINTR;
-                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
 
@@ -1210,15 +1266,11 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
 
                                 if (status != 0)  {
                                         req->rq_status = status;
-                                        ptlrpc_rqphase_move(req, 
-                                                RQ_PHASE_INTERPRET);
                                         spin_unlock(&imp->imp_lock);
                                         GOTO(interpret, req->rq_status);
                                 }
                                 if (req->rq_no_resend) {
                                         req->rq_status = -ENOTCONN;
-                                        ptlrpc_rqphase_move(req, 
-                                                RQ_PHASE_INTERPRET);
                                         spin_unlock(&imp->imp_lock);
                                         GOTO(interpret, req->rq_status);
                                 }
@@ -1293,10 +1345,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                          * process the reply. Similarly if the RPC returned
                          * an error, and therefore the bulk will never arrive.
                          */
-                        if (req->rq_bulk == NULL || req->rq_status != 0) {
-                                ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
+                        if (req->rq_bulk == NULL || req->rq_status != 0)
                                 GOTO(interpret, req->rq_status);
-                        }
 
                         ptlrpc_rqphase_move(req, RQ_PHASE_BULK);
                 }
@@ -1312,15 +1362,11 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                          * the ACK for her PUT. */
                         DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
                         req->rq_status = -EIO;
-                        ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
                         GOTO(interpret, req->rq_status);
                 }
-
+      interpret:
                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
 
-        interpret:
-                LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
-
                 /* This moves to "unregistering" phase we need to wait for
                  * reply unlink. */
                 if (!ptlrpc_unregister_reply(req, 1))
@@ -1333,12 +1379,8 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                  * finished. */
                 LASSERT(!req->rq_receiving_reply);
 
-                if (req->rq_interpret_reply != NULL) {
-                        int (*interpreter)(struct ptlrpc_request *,void *,int) =
-                                req->rq_interpret_reply;
-                        req->rq_status = interpreter(req, &req->rq_async_args,
-                                                     req->rq_status);
-                }
+                ptlrpc_req_interpret(req, req->rq_status);
+
                 ptlrpc_rqphase_move(req, RQ_PHASE_COMPLETE);
 
                 CDEBUG(D_RPCTRACE, "Completed RPC pname:cluuid:pid:xid:nid:"
index 0efa98d..b33d69d 100644 (file)
@@ -162,6 +162,8 @@ EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
 EXPORT_SYMBOL(ptlrpc_init_rq_pool);
 EXPORT_SYMBOL(ptlrpc_free_rq_pool);
 EXPORT_SYMBOL(ptlrpc_prep_req_pool);
+EXPORT_SYMBOL(ptlrpc_prep_fakereq);
+EXPORT_SYMBOL(ptlrpc_fakereq_finished);
 EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
 EXPORT_SYMBOL(ptlrpc_prep_req);
 EXPORT_SYMBOL(ptlrpc_unregister_reply);
index c5bb52e..fe4a7d6 100644 (file)
@@ -95,7 +95,7 @@ EXPORT_SYMBOL(ptlrpcd_add_rqset);
  * Requests that are added to the ptlrpcd queue are sent via
  * ptlrpcd_check->ptlrpc_check_set().
  */
-void ptlrpcd_add_req(struct ptlrpc_request *req)
+int ptlrpcd_add_req(struct ptlrpc_request *req)
 {
         struct ptlrpcd_ctl *pc;
         int rc;
@@ -106,11 +106,6 @@ void ptlrpcd_add_req(struct ptlrpc_request *req)
                 pc = &ptlrpcd_recovery_pc;
         rc = ptlrpc_set_add_new_req(pc, req);
         if (rc) {
-                int (*interpreter)(struct ptlrpc_request *,
-                                   void *, int);
-
-                interpreter = req->rq_interpret_reply;
-
                 /*
                  * Thread is probably in stop now so we need to
                  * kill this rpc as it was not added. Let's call
@@ -118,12 +113,12 @@ void ptlrpcd_add_req(struct ptlrpc_request *req)
                  * so that higher levels might free assosiated
                  * resources.
                 */
-                req->rq_status = -EBADR;
-                interpreter(req, &req->rq_async_args,
-                            req->rq_status);
+
+                ptlrpc_req_interpret(req, -EBADR);
                 req->rq_set = NULL;
                 ptlrpc_req_finished(req);
         }
+        return rc;
 }
 
 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)