Whamcloud - gitweb
LU-3467 target: generic hpreq handler in target 83/7383/41
authorMikhail Pershin <mike.pershin@intel.com>
Sun, 18 Aug 2013 12:53:24 +0000 (16:53 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 9 Jan 2014 22:30:20 +0000 (22:30 +0000)
Make high-priority request handling generic. Each request handler
may initialize now not only generic handler but also high-priority
handler. Move specific OST hp callbacks to the OFD.

Remove rq_recovery_session from ptlrpc_request and use rq_session
always. That additional session was needed when recovery request
was copied, so the normal session might become freed. Now request
is not copied but referenced and only single session is enough.

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: Iabf36d0828a86974bfe0638957f6018c919ac13b
Reviewed-on: http://review.whamcloud.com/7383
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
lustre/include/lu_target.h
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lib.c
lustre/ofd/ofd_dev.c
lustre/ost/ost_handler.c
lustre/ptlrpc/ptlrpc_internal.h
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/service.c
lustre/target/tgt_handler.c

index 879b573..9d952bf 100644 (file)
@@ -117,6 +117,7 @@ struct tgt_session_info {
         * Additional fail id that can be set by handler.
         */
        int                      tsi_reply_fail_id;
+       bool                     tsi_preprocessed;
        /* request JobID */
        char                    *tsi_jobid;
 };
@@ -207,9 +208,9 @@ struct tgt_handler {
        /* Request version for this opcode */
        int                      th_version;
        /* Handler function */
-       int                     (*th_act)(struct tgt_session_info *tti);
+       int                     (*th_act)(struct tgt_session_info *tsi);
        /* Handler function for high priority requests */
-       int                     (*th_hp)(struct tgt_session_info *tti);
+       void                    (*th_hp)(struct tgt_session_info *tsi);
        /* Request format for this request */
        const struct req_format *th_fmt;
 };
@@ -296,6 +297,8 @@ struct tgt_commit_cb {
        void     *tgt_cb_data;
 };
 
+int tgt_hpreq_handler(struct ptlrpc_request *req);
+
 /* target/tgt_main.c */
 void tgt_boot_epoch_update(struct lu_target *lut);
 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
@@ -360,7 +363,7 @@ static inline void tgt_drop_id(struct obd_export *exp, struct obdo *oa)
 /*
  * Unified target generic handers macros and generic functions.
  */
-#define TGT_RPC_HANDLER(base, flags, opc, fn, fmt, version)            \
+#define TGT_RPC_HANDLER_HP(base, flags, opc, fn, hp, fmt, version)     \
 [opc - base] = {                                                       \
        .th_name        = #opc,                                         \
        .th_fail_id     = OBD_FAIL_ ## opc ## _NET,                     \
@@ -368,8 +371,11 @@ static inline void tgt_drop_id(struct obd_export *exp, struct obdo *oa)
        .th_flags       = flags,                                        \
        .th_act         = fn,                                           \
        .th_fmt         = fmt,                                          \
-       .th_version     = version                                       \
+       .th_version     = version,                                      \
+       .th_hp          = hp,                                           \
 }
+#define TGT_RPC_HANDLER(base, flags, opc, fn, fmt, version)            \
+       TGT_RPC_HANDLER_HP(base, flags, opc, fn, NULL, fmt, version)
 
 /* MDT Request with a format known in advance */
 #define TGT_MDT_HDL(flags, name, fn)                                   \
@@ -380,10 +386,13 @@ static inline void tgt_drop_id(struct obd_export *exp, struct obdo *oa)
        TGT_RPC_HANDLER(MDS_FIRST_OPC, flags, name, fn, NULL,           \
                        LUSTRE_MDS_VERSION)
 
-/* MDT Request with a format known in advance */
+/* OST Request with a format known in advance */
 #define TGT_OST_HDL(flags, name, fn)                                   \
        TGT_RPC_HANDLER(OST_FIRST_OPC, flags, name, fn, &RQF_ ## name,  \
                        LUSTRE_OST_VERSION)
+#define TGT_OST_HDL_HP(flags, name, fn, hp)                            \
+       TGT_RPC_HANDLER_HP(OST_FIRST_OPC, flags, name, fn, hp,          \
+                          &RQF_ ## name, LUSTRE_OST_VERSION)
 
 /* MGS request with a format known in advance */
 #define TGT_MGS_HDL(flags, name, fn)                                   \
index 3c9a2d7..17b017c 100644 (file)
@@ -2033,7 +2033,6 @@ struct ptlrpc_request {
         struct ptlrpc_request_pool *rq_pool;
 
         struct lu_context           rq_session;
-        struct lu_context           rq_recov_session;
 
         /** request format description */
         struct req_capsule          rq_pill;
index 47279ba..83c19cf 100644 (file)
@@ -1899,34 +1899,26 @@ static int handle_recovery_req(struct ptlrpc_thread *thread,
                                struct ptlrpc_request *req,
                                svc_handler_t handler)
 {
-        int rc;
-        ENTRY;
+       int rc;
 
-        /**
-         * export can be evicted during recovery, no need to handle replays for
-         * it after that, discard such request silently
-         */
-        if (req->rq_export->exp_disconnected)
-                GOTO(reqcopy_put, rc = 0);
+       ENTRY;
 
-        rc = lu_context_init(&req->rq_recov_session, LCT_SERVER_SESSION);
-        if (rc) {
-                CERROR("Failure to initialize session: %d\n", rc);
-                GOTO(reqcopy_put, rc);
-        }
+       /**
+        * export can be evicted during recovery, no need to handle replays for
+        * it after that, discard such request silently
+        */
+       if (req->rq_export->exp_disconnected)
+               GOTO(reqcopy_put, rc = 0);
 
-        req->rq_recov_session.lc_thread = thread;
-        lu_context_enter(&req->rq_recov_session);
-        req->rq_svc_thread = thread;
-        req->rq_svc_thread->t_env->le_ses = &req->rq_recov_session;
+       req->rq_session.lc_thread = thread;
+       req->rq_svc_thread = thread;
+       req->rq_svc_thread->t_env->le_ses = &req->rq_session;
 
         /* thread context */
         lu_context_enter(&thread->t_env->le_ctx);
         (void)handler(req);
         lu_context_exit(&thread->t_env->le_ctx);
 
-        lu_context_exit(&req->rq_recov_session);
-        lu_context_fini(&req->rq_recov_session);
         /* don't reset timer for final stage */
         if (!exp_finished(req->rq_export)) {
                 int to = obd_timeout;
index 91e28b4..ba493de 100644 (file)
@@ -1555,7 +1555,6 @@ out:
        return rc;
 }
 
-
 static int ofd_quotactl(struct tgt_session_info *tsi)
 {
        struct obd_quotactl     *oqctl, *repoqc;
@@ -1585,6 +1584,302 @@ static int ofd_quotactl(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
+/* High priority request handlers for OFD */
+
+/* prolong locks for the current service time of the corresponding
+ * portal (= OST_IO_PORTAL)
+ */
+static inline int prolong_timeout(struct ptlrpc_request *req)
+{
+       struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
+
+       if (AT_OFF)
+               return obd_timeout / 2;
+
+       return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
+                  ldlm_timeout);
+}
+
+static int ofd_prolong_one_lock(struct tgt_session_info *tsi,
+                               struct ldlm_lock *lock,
+                               struct ldlm_extent *extent, int timeout)
+{
+
+       if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
+               return 0;
+
+       /* XXX: never try to grab resource lock here because we're inside
+        * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
+        * res lock and then exp_bl_list_lock. */
+
+       if (!(lock->l_flags & LDLM_FL_AST_SENT))
+               /* ignore locks not being cancelled */
+               return 0;
+
+       LDLM_DEBUG(lock, "refreshed for req x"LPU64" ext("LPU64"->"LPU64") "
+                        "to %ds.\n", tgt_ses_req(tsi)->rq_xid, extent->start,
+                        extent->end, timeout);
+
+       /* OK. this is a possible lock the user holds doing I/O
+        * let's refresh eviction timer for it */
+       ldlm_refresh_waiting_lock(lock, timeout);
+       return 1;
+}
+
+static int ofd_prolong_extent_locks(struct tgt_session_info *tsi,
+                                   __u64 start, __u64 end)
+{
+       struct obd_export       *exp = tsi->tsi_exp;
+       struct obdo             *oa  = &tsi->tsi_ost_body->oa;
+       struct ldlm_extent       extent = {
+               .start = start,
+               .end = end
+       };
+       struct ldlm_lock        *lock;
+       int                      timeout = prolong_timeout(tgt_ses_req(tsi));
+       int                      lock_count = 0;
+
+       ENTRY;
+
+       if (oa->o_valid & OBD_MD_FLHANDLE) {
+               /* mostly a request should be covered by only one lock, try
+                * fast path. */
+               lock = ldlm_handle2lock(&oa->o_handle);
+               if (lock != NULL) {
+                       /* Fast path to check if the lock covers the whole IO
+                        * region exclusively. */
+                       if (lock->l_granted_mode == LCK_PW &&
+                           ldlm_extent_contain(&lock->l_policy_data.l_extent,
+                                               &extent)) {
+                               /* bingo */
+                               LASSERT(lock->l_export == exp);
+                               lock_count = ofd_prolong_one_lock(tsi, lock,
+                                                            &extent, timeout);
+                               LDLM_LOCK_PUT(lock);
+                               RETURN(lock_count);
+                       }
+                       LDLM_LOCK_PUT(lock);
+               }
+       }
+
+       spin_lock_bh(&exp->exp_bl_list_lock);
+       list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
+               LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+               LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
+
+               if (!ldlm_res_eq(&tsi->tsi_resid, &lock->l_resource->lr_name))
+                       continue;
+
+               if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
+                                        &extent))
+                       continue;
+
+               lock_count += ofd_prolong_one_lock(tsi, lock, &extent, timeout);
+       }
+       spin_unlock_bh(&exp->exp_bl_list_lock);
+
+       RETURN(lock_count);
+}
+
+/**
+ * Returns 1 if the given PTLRPC matches the given LDLM lock, or 0 if it does
+ * not.
+ */
+static int ofd_rw_hpreq_lock_match(struct ptlrpc_request *req,
+                                  struct ldlm_lock *lock)
+{
+       struct niobuf_remote    *rnb;
+       struct obd_ioobj        *ioo;
+       ldlm_mode_t              mode;
+       struct ldlm_extent       ext;
+       __u32                    opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+       ENTRY;
+
+       ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+       LASSERT(ioo != NULL);
+
+       rnb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+       LASSERT(rnb != NULL);
+
+       ext.start = rnb->offset;
+       rnb += ioo->ioo_bufcnt - 1;
+       ext.end = rnb->offset + rnb->len - 1;
+
+       LASSERT(lock->l_resource != NULL);
+       if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
+               RETURN(0);
+
+       mode = LCK_PW;
+       if (opc == OST_READ)
+               mode |= LCK_PR;
+
+       if (!(lock->l_granted_mode & mode))
+               RETURN(0);
+
+       RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
+}
+
+/**
+ * High-priority queue request check for whether the given PTLRPC request
+ * (\a req) is blocking an LDLM lock cancel.
+ *
+ * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
+ * cancel, 0 if it is not, and -EFAULT if the request is malformed.
+ *
+ * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
+ * function looks only at OST_READs and OST_WRITEs.
+ */
+static int ofd_rw_hpreq_check(struct ptlrpc_request *req)
+{
+       struct tgt_session_info *tsi;
+       struct obd_ioobj        *ioo;
+       struct niobuf_remote    *rnb;
+       __u64                    start, end;
+       int                      lock_count;
+
+       ENTRY;
+
+       /* Don't use tgt_ses_info() to get session info, because lock_match()
+        * can be called while request has no processing thread yet. */
+       tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
+       LASSERT(tsi != NULL);
+
+       /*
+        * Use LASSERT below because malformed RPCs should have
+        * been filtered out in tgt_hpreq_handler().
+        */
+       ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+       LASSERT(ioo != NULL);
+
+       rnb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+       LASSERT(rnb != NULL);
+       LASSERT(!(rnb->flags & OBD_BRW_SRVLOCK));
+
+       start = rnb->offset;
+       rnb += ioo->ioo_bufcnt - 1;
+       end = rnb->offset + rnb->len - 1;
+
+       DEBUG_REQ(D_RPCTRACE, req, "%s %s: refresh rw locks: "DFID
+                                  " ("LPU64"->"LPU64")\n",
+                 tgt_name(tsi->tsi_tgt), current->comm,
+                 PFID(&tsi->tsi_fid), start, end);
+
+       lock_count = ofd_prolong_extent_locks(tsi, start, end);
+
+       CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
+              tgt_name(tsi->tsi_tgt), lock_count, req);
+
+       RETURN(lock_count > 0);
+}
+
+static void ofd_rw_hpreq_fini(struct ptlrpc_request *req)
+{
+       ofd_rw_hpreq_check(req);
+}
+
+/**
+ * Like tgt_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
+ */
+static int ofd_punch_hpreq_lock_match(struct ptlrpc_request *req,
+                                     struct ldlm_lock *lock)
+{
+       struct tgt_session_info *tsi;
+
+       /* Don't use tgt_ses_info() to get session info, because lock_match()
+        * can be called while request has no processing thread yet. */
+       tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
+       LASSERT(tsi != NULL);
+
+       LASSERT(tsi->tsi_ost_body != NULL);
+       if (tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLHANDLE &&
+           tsi->tsi_ost_body->oa.o_handle.cookie == lock->l_handle.h_cookie)
+               return 1;
+
+       return 0;
+}
+
+/**
+ * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
+ */
+static int ofd_punch_hpreq_check(struct ptlrpc_request *req)
+{
+       struct tgt_session_info *tsi;
+       struct obdo             *oa;
+       int                      lock_count;
+
+       ENTRY;
+
+       /* Don't use tgt_ses_info() to get session info, because lock_match()
+        * can be called while request has no processing thread yet. */
+       tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
+       LASSERT(tsi != NULL);
+       oa = &tsi->tsi_ost_body->oa;
+
+       LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS &&
+                 oa->o_flags & OBD_FL_SRVLOCK));
+
+       CDEBUG(D_DLMTRACE,
+              "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+              tgt_name(tsi->tsi_tgt), tsi->tsi_resid.name[0],
+              tsi->tsi_resid.name[1], oa->o_size, oa->o_blocks);
+
+       lock_count = ofd_prolong_extent_locks(tsi, oa->o_size, oa->o_blocks);
+
+       CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
+              tgt_name(tsi->tsi_tgt), lock_count, req);
+
+       RETURN(lock_count > 0);
+}
+
+static void ofd_punch_hpreq_fini(struct ptlrpc_request *req)
+{
+       ofd_punch_hpreq_check(req);
+}
+
+struct ptlrpc_hpreq_ops ofd_hpreq_rw = {
+       .hpreq_lock_match       = ofd_rw_hpreq_lock_match,
+       .hpreq_check            = ofd_rw_hpreq_check,
+       .hpreq_fini             = ofd_rw_hpreq_fini
+};
+
+struct ptlrpc_hpreq_ops ofd_hpreq_punch = {
+       .hpreq_lock_match       = ofd_punch_hpreq_lock_match,
+       .hpreq_check            = ofd_punch_hpreq_check,
+       .hpreq_fini             = ofd_punch_hpreq_fini
+};
+
+/** Assign high priority operations to the IO requests */
+static void ofd_hp_brw(struct tgt_session_info *tsi)
+{
+       struct niobuf_remote    *rnb;
+       struct obd_ioobj        *ioo;
+
+       ENTRY;
+
+       ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
+       LASSERT(ioo != NULL); /* must exist after request preprocessing */
+       if (ioo->ioo_bufcnt > 0) {
+               rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE);
+               LASSERT(rnb != NULL); /* must exist after request preprocessing */
+
+               /* no high priority if server lock is needed */
+               if (rnb->flags & OBD_BRW_SRVLOCK)
+                       return;
+       }
+       tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_rw;
+}
+
+static void ofd_hp_punch(struct tgt_session_info *tsi)
+{
+       LASSERT(tsi->tsi_ost_body != NULL); /* must exists if we are here */
+       /* no high-priority if server lock is needed */
+       if (tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLFLAGS &&
+           tsi->tsi_ost_body->oa.o_flags & OBD_FL_SRVLOCK)
+               return;
+       tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_punch;
+}
+
 #define OBD_FAIL_OST_READ_NET  OBD_FAIL_OST_BRW_NET
 #define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
 #define OST_BRW_READ   OST_READ
@@ -1609,11 +1904,15 @@ TGT_OST_HDL(0           | HABEO_REFERO | MUTABOR,
 TGT_OST_HDL(0          | HABEO_REFERO | MUTABOR,
                                        OST_DESTROY,    ofd_destroy_hdl),
 TGT_OST_HDL(0          | HABEO_REFERO, OST_STATFS,     ofd_statfs_hdl),
-TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO,        OST_BRW_READ,   tgt_brw_read),
+TGT_OST_HDL_HP(HABEO_CORPUS| HABEO_REFERO,
+                                       OST_BRW_READ,   tgt_brw_read,
+                                                       ofd_hp_brw),
 /* don't set CORPUS flag for brw_write because -ENOENT may be valid case */
-TGT_OST_HDL(MUTABOR,                   OST_BRW_WRITE,  tgt_brw_write),
-TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
-                                       OST_PUNCH,      ofd_punch_hdl),
+TGT_OST_HDL_HP(HABEO_CORPUS| MUTABOR,  OST_BRW_WRITE,  tgt_brw_write,
+                                                       ofd_hp_brw),
+TGT_OST_HDL_HP(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
+                                       OST_PUNCH,      ofd_punch_hdl,
+                                                       ofd_hp_punch),
 TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO,        OST_SYNC,       ofd_sync_hdl),
 TGT_OST_HDL(0          | HABEO_REFERO, OST_QUOTACTL,   ofd_quotactl),
 };
index 7567acf..d4da00d 100644 (file)
@@ -67,473 +67,6 @@ static char *oss_io_cpts;
 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
                "CPU partitions OSS IO threads should run on");
 
-/**
- * Validate oa from client.
- * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
- * req are valid.
- *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
- *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
- *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
- *       whether this is oi_fid or real ostid. So it will check
- *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
- *    c. Old FID-disable osc will send IDIF.
- *    d. new FID-enable osc/osp will send normal FID.
- *
- * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
- * be used for LAST_ID file, and only being accessed inside OST now.
- */
-static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
-                            struct obd_ioobj *ioobj)
-{
-       int rc = 0;
-
-       if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
-                    fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
-               /* Sigh 2.[123] client still sends echo req with oi_id = 0
-                * during create, and we will reset this to 1, since this
-                * oi_id is basically useless in the following create process,
-                * but oi_id == 0 will make it difficult to tell whether it is
-                * real FID or ost_id. */
-               oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
-               oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
-               oa->o_oi.oi_fid.f_ver = 0;
-       } else {
-               if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
-                       GOTO(out, rc = -EPROTO);
-
-               /* Note: this check might be forced in 2.5 or 2.6, i.e.
-                * all of the requests are required to setup FLGROUP */
-               if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
-                       ostid_set_seq_mdt0(&oa->o_oi);
-                       if (ioobj)
-                               ostid_set_seq_mdt0(&ioobj->ioo_oid);
-                       oa->o_valid |= OBD_MD_FLGROUP;
-               }
-
-               if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
-                       GOTO(out, rc = -EPROTO);
-       }
-
-       if (ioobj != NULL) {
-               unsigned max_brw = ioobj_max_brw_get(ioobj);
-
-               if (unlikely((max_brw & (max_brw - 1)) != 0)) {
-                       CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
-                              ": rc = -EPROTO\n", exp->exp_obd->obd_name,
-                              obd_export_nid2str(exp), max_brw,
-                              POSTID(&oa->o_oi));
-                       GOTO(out, rc = -EPROTO);
-               }
-               ioobj->ioo_oid = oa->o_oi;
-       }
-
-out:
-       if (rc != 0)
-               CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
-                      exp->exp_obd->obd_name, obd_export_nid2str(exp),
-                      oa ? ostid_seq(&oa->o_oi) : -1,
-                      oa ? ostid_id(&oa->o_oi) : -1, rc);
-       return rc;
-}
-
-struct ost_prolong_data {
-        struct ptlrpc_request *opd_req;
-        struct obd_export     *opd_exp;
-        struct obdo           *opd_oa;
-        struct ldlm_res_id     opd_resid;
-        struct ldlm_extent     opd_extent;
-        ldlm_mode_t            opd_mode;
-        unsigned int           opd_locks;
-        int                    opd_timeout;
-};
-
-/* prolong locks for the current service time of the corresponding
- * portal (= OST_IO_PORTAL)
- */
-static inline int prolong_timeout(struct ptlrpc_request *req)
-{
-       struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-
-       if (AT_OFF)
-               return obd_timeout / 2;
-
-       return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
-                  ldlm_timeout);
-}
-
-static void ost_prolong_lock_one(struct ost_prolong_data *opd,
-                                 struct ldlm_lock *lock)
-{
-       LASSERT(lock->l_export == opd->opd_exp);
-
-       if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
-               return;
-
-        /* XXX: never try to grab resource lock here because we're inside
-         * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
-         * res lock and then exp_bl_list_lock. */
-
-        if (!(lock->l_flags & LDLM_FL_AST_SENT))
-                /* ignore locks not being cancelled */
-                return;
-
-        LDLM_DEBUG(lock,
-                   "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
-                   opd->opd_req->rq_xid, opd->opd_extent.start,
-                   opd->opd_extent.end, opd->opd_timeout);
-
-        /* OK. this is a possible lock the user holds doing I/O
-         * let's refresh eviction timer for it */
-        ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
-        ++opd->opd_locks;
-}
-
-static void ost_prolong_locks(struct ost_prolong_data *data)
-{
-        struct obd_export *exp = data->opd_exp;
-        struct obdo       *oa  = data->opd_oa;
-        struct ldlm_lock  *lock;
-        ENTRY;
-
-        if (oa->o_valid & OBD_MD_FLHANDLE) {
-                /* mostly a request should be covered by only one lock, try
-                 * fast path. */
-                lock = ldlm_handle2lock(&oa->o_handle);
-                if (lock != NULL) {
-                        /* Fast path to check if the lock covers the whole IO
-                         * region exclusively. */
-                        if (lock->l_granted_mode == LCK_PW &&
-                            ldlm_extent_contain(&lock->l_policy_data.l_extent,
-                                                &data->opd_extent)) {
-                                /* bingo */
-                                ost_prolong_lock_one(data, lock);
-                                LDLM_LOCK_PUT(lock);
-                                RETURN_EXIT;
-                        }
-                        LDLM_LOCK_PUT(lock);
-                }
-        }
-
-
-       spin_lock_bh(&exp->exp_bl_list_lock);
-        cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
-                LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
-                LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
-
-                if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
-                        continue;
-
-                if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
-                                         &data->opd_extent))
-                        continue;
-
-                ost_prolong_lock_one(data, lock);
-        }
-       spin_unlock_bh(&exp->exp_bl_list_lock);
-
-       EXIT;
-}
-
-/**
- * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
- * not.
- */
-static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
-                                   struct ldlm_lock *lock)
-{
-        struct niobuf_remote *nb;
-        struct obd_ioobj *ioo;
-        int mode, opc;
-        struct ldlm_extent ext;
-        ENTRY;
-
-        opc = lustre_msg_get_opc(req->rq_reqmsg);
-        LASSERT(opc == OST_READ || opc == OST_WRITE);
-
-        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
-        LASSERT(ioo != NULL);
-
-        nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
-        LASSERT(nb != NULL);
-
-        ext.start = nb->offset;
-        nb += ioo->ioo_bufcnt - 1;
-        ext.end = nb->offset + nb->len - 1;
-
-       LASSERT(lock->l_resource != NULL);
-       if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
-               RETURN(0);
-
-        mode = LCK_PW;
-        if (opc == OST_READ)
-                mode |= LCK_PR;
-        if (!(lock->l_granted_mode & mode))
-                RETURN(0);
-
-        RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
-}
-
-/**
- * High-priority queue request check for whether the given PTLRPC request (\a
- * req) is blocking an LDLM lock cancel.
- *
- * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
- * cancel, 0 if it is not, and -EFAULT if the request is malformed.
- *
- * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
- * function looks only at OST_READs and OST_WRITEs.
- */
-static int ost_rw_hpreq_check(struct ptlrpc_request *req)
-{
-        struct obd_device *obd = req->rq_export->exp_obd;
-        struct ost_body *body;
-        struct obd_ioobj *ioo;
-        struct niobuf_remote *nb;
-        struct ost_prolong_data opd = { 0 };
-        int mode, opc;
-        ENTRY;
-
-        /*
-         * Use LASSERT to do sanity check because malformed RPCs should have
-         * been filtered out in ost_hpreq_handler().
-         */
-        opc = lustre_msg_get_opc(req->rq_reqmsg);
-        LASSERT(opc == OST_READ || opc == OST_WRITE);
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body != NULL);
-
-        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
-        LASSERT(ioo != NULL);
-
-        nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
-        LASSERT(nb != NULL);
-        LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
-
-       ostid_build_res_name(&ioo->ioo_oid, &opd.opd_resid);
-
-        opd.opd_req = req;
-        mode = LCK_PW;
-        if (opc == OST_READ)
-                mode |= LCK_PR;
-        opd.opd_mode = mode;
-        opd.opd_exp = req->rq_export;
-        opd.opd_oa  = &body->oa;
-        opd.opd_extent.start = nb->offset;
-        nb += ioo->ioo_bufcnt - 1;
-        opd.opd_extent.end = nb->offset + nb->len - 1;
-        opd.opd_timeout = prolong_timeout(req);
-
-       DEBUG_REQ(D_RPCTRACE, req,
-              "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
-              obd->obd_name, current->comm,
-              opd.opd_resid.name[0], opd.opd_resid.name[1],
-              opd.opd_extent.start, opd.opd_extent.end);
-
-        ost_prolong_locks(&opd);
-
-        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
-               obd->obd_name, opd.opd_locks, req);
-
-        RETURN(opd.opd_locks > 0);
-}
-
-static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
-{
-        (void)ost_rw_hpreq_check(req);
-}
-
-/**
- * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
-                                      struct ldlm_lock *lock)
-{
-        struct ost_body *body;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body != NULL);
-
-        if (body->oa.o_valid & OBD_MD_FLHANDLE &&
-            body->oa.o_handle.cookie == lock->l_handle.h_cookie)
-                RETURN(1);
-
-        RETURN(0);
-}
-
-/**
- * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_check(struct ptlrpc_request *req)
-{
-        struct obd_device *obd = req->rq_export->exp_obd;
-        struct ost_body *body;
-        struct obdo *oa;
-        struct ost_prolong_data opd = { 0 };
-        __u64 start, end;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        LASSERT(body != NULL);
-
-        oa = &body->oa;
-        LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
-                !(oa->o_flags & OBD_FL_SRVLOCK));
-
-        start = oa->o_size;
-        end = start + oa->o_blocks;
-
-        opd.opd_req = req;
-        opd.opd_mode = LCK_PW;
-        opd.opd_exp = req->rq_export;
-        opd.opd_oa  = oa;
-        opd.opd_extent.start = start;
-        opd.opd_extent.end   = end;
-        if (oa->o_blocks == OBD_OBJECT_EOF)
-                opd.opd_extent.end = OBD_OBJECT_EOF;
-        opd.opd_timeout = prolong_timeout(req);
-
-       ostid_build_res_name(&oa->o_oi, &opd.opd_resid);
-
-        CDEBUG(D_DLMTRACE,
-               "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
-               obd->obd_name,
-               opd.opd_resid.name[0], opd.opd_resid.name[1],
-               opd.opd_extent.start, opd.opd_extent.end);
-
-        ost_prolong_locks(&opd);
-
-        CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
-               obd->obd_name, opd.opd_locks, req);
-
-        RETURN(opd.opd_locks > 0);
-}
-
-static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
-{
-        (void)ost_punch_hpreq_check(req);
-}
-
-struct ptlrpc_hpreq_ops ost_hpreq_rw = {
-        .hpreq_lock_match = ost_rw_hpreq_lock_match,
-        .hpreq_check      = ost_rw_hpreq_check,
-        .hpreq_fini       = ost_rw_hpreq_fini
-};
-
-struct ptlrpc_hpreq_ops ost_hpreq_punch = {
-        .hpreq_lock_match = ost_punch_hpreq_lock_match,
-        .hpreq_check      = ost_punch_hpreq_check,
-        .hpreq_fini       = ost_punch_hpreq_fini
-};
-
-/** Assign high priority operations to the request if needed. */
-static int ost_io_hpreq_handler(struct ptlrpc_request *req)
-{
-        ENTRY;
-        if (req->rq_export) {
-                int opc = lustre_msg_get_opc(req->rq_reqmsg);
-                struct ost_body *body;
-
-                if (opc == OST_READ || opc == OST_WRITE) {
-                        struct niobuf_remote *nb;
-                        struct obd_ioobj *ioo;
-                        int objcount, niocount;
-                        int rc;
-                        int i;
-
-                        /* RPCs on the H-P queue can be inspected before
-                         * ost_handler() initializes their pills, so we
-                         * initialize that here.  Capsule initialization is
-                         * idempotent, as is setting the pill's format (provided
-                         * it doesn't change).
-                         */
-                        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-                        if (opc == OST_READ)
-                                req_capsule_set(&req->rq_pill,
-                                                &RQF_OST_BRW_READ);
-                        else
-                                req_capsule_set(&req->rq_pill,
-                                                &RQF_OST_BRW_WRITE);
-
-                        body = req_capsule_client_get(&req->rq_pill,
-                                                      &RMF_OST_BODY);
-                        if (body == NULL) {
-                                CERROR("Missing/short ost_body\n");
-                                RETURN(-EFAULT);
-                        }
-
-                        objcount = req_capsule_get_size(&req->rq_pill,
-                                                        &RMF_OBD_IOOBJ,
-                                                        RCL_CLIENT) /
-                                                        sizeof(*ioo);
-                        if (objcount == 0) {
-                                CERROR("Missing/short ioobj\n");
-                                RETURN(-EFAULT);
-                        }
-                        if (objcount > 1) {
-                                CERROR("too many ioobjs (%d)\n", objcount);
-                                RETURN(-EFAULT);
-                        }
-
-                        ioo = req_capsule_client_get(&req->rq_pill,
-                                                     &RMF_OBD_IOOBJ);
-                        if (ioo == NULL) {
-                                CERROR("Missing/short ioobj\n");
-                                RETURN(-EFAULT);
-                        }
-
-                        rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
-                        if (rc) {
-                                CERROR("invalid object ids\n");
-                                RETURN(rc);
-                        }
-
-                        for (niocount = i = 0; i < objcount; i++) {
-                                if (ioo[i].ioo_bufcnt == 0) {
-                                        CERROR("ioo[%d] has zero bufcnt\n", i);
-                                        RETURN(-EFAULT);
-                                }
-                                niocount += ioo[i].ioo_bufcnt;
-                        }
-                        if (niocount > PTLRPC_MAX_BRW_PAGES) {
-                                DEBUG_REQ(D_RPCTRACE, req,
-                                          "bulk has too many pages (%d)",
-                                          niocount);
-                                RETURN(-EFAULT);
-                        }
-
-                        nb = req_capsule_client_get(&req->rq_pill,
-                                                    &RMF_NIOBUF_REMOTE);
-                        if (nb == NULL) {
-                                CERROR("Missing/short niobuf\n");
-                                RETURN(-EFAULT);
-                        }
-
-                        if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
-                                req->rq_ops = &ost_hpreq_rw;
-                } else if (opc == OST_PUNCH) {
-                        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-                        req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
-
-                        body = req_capsule_client_get(&req->rq_pill,
-                                                      &RMF_OST_BODY);
-                        if (body == NULL) {
-                                CERROR("Missing/short ost_body\n");
-                                RETURN(-EFAULT);
-                        }
-
-                        if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
-                            !(body->oa.o_flags & OBD_FL_SRVLOCK))
-                                req->rq_ops = &ost_hpreq_punch;
-                }
-        }
-        RETURN(0);
-}
-
 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
 
 static struct cfs_cpt_table    *ost_io_cptable;
@@ -698,7 +231,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                        .so_thr_init            = tgt_io_thread_init,
                        .so_thr_done            = tgt_io_thread_done,
                        .so_req_handler         = tgt_request_handle,
-                       .so_hpreq_handler       = ost_io_hpreq_handler,
+                       .so_hpreq_handler       = tgt_hpreq_handler,
                        .so_req_printer         = target_print_req,
                },
        };
index 5a2b2d9..2cb3a72 100644 (file)
@@ -289,7 +289,7 @@ static inline int ll_rpc_recoverable_error(int rc)
         return (rc == -ENOTCONN || rc == -ENODEV);
 }
 
-#ifdef HAVE_SERVER_SUPPORT
+#if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__)
 int tgt_mod_init(void);
 void tgt_mod_exit(void);
 #else
index f958a8a..56e5d5f 100644 (file)
@@ -56,111 +56,96 @@ extern struct mutex ptlrpcd_mutex;
 
 __init int ptlrpc_init(void)
 {
-        int rc, cleanup_phase = 0;
-        ENTRY;
+       int rc;
 
-        lustre_assert_wire_constants();
+       ENTRY;
+
+       lustre_assert_wire_constants();
 #if RS_DEBUG
        spin_lock_init(&ptlrpc_rs_debug_lock);
 #endif
        mutex_init(&ptlrpc_all_services_mutex);
        mutex_init(&pinger_mutex);
        mutex_init(&ptlrpcd_mutex);
-        ptlrpc_init_xid();
+       ptlrpc_init_xid();
 
        rc = req_layout_init();
        if (rc)
                RETURN(rc);
 
+       rc = tgt_mod_init();
+       if (rc)
+               GOTO(err_layout, rc);
+
        rc = ptlrpc_hr_init();
        if (rc)
-               RETURN(rc);
+               GOTO(err_tgt, rc);
 
-       cleanup_phase = 1;
        rc = ptlrpc_request_cache_init();
        if (rc)
-               GOTO(cleanup, rc);
+               GOTO(err_hr, rc);
 
-       cleanup_phase = 2;
        rc = ptlrpc_init_portals();
        if (rc)
-               GOTO(cleanup, rc);
-
-       cleanup_phase = 3;
+               GOTO(err_cache, rc);
 
        rc = ptlrpc_connection_init();
        if (rc)
-               GOTO(cleanup, rc);
+               GOTO(err_portals, rc);
 
-       cleanup_phase = 4;
        ptlrpc_put_connection_superhack = ptlrpc_connection_put;
 
        rc = ptlrpc_start_pinger();
        if (rc)
-               GOTO(cleanup, rc);
+               GOTO(err_conn, rc);
 
-       cleanup_phase = 5;
        rc = ldlm_init();
        if (rc)
-               GOTO(cleanup, rc);
+               GOTO(err_pinger, rc);
 
-       cleanup_phase = 6;
        rc = sptlrpc_init();
        if (rc)
-               GOTO(cleanup, rc);
+               GOTO(err_ldlm, rc);
 
-       cleanup_phase = 7;
        rc = ptlrpc_nrs_init();
        if (rc)
-               GOTO(cleanup, rc);
-
-#ifdef __KERNEL__
-       cleanup_phase = 8;
-       rc = tgt_mod_init();
-       if (rc)
-               GOTO(cleanup, rc);
-#endif
-        RETURN(0);
-
-cleanup:
-        switch(cleanup_phase) {
-#ifdef __KERNEL__
-       case 8:
-               ptlrpc_nrs_fini();
-#endif
-       case 7:
-               sptlrpc_fini();
-       case 6:
-               ldlm_exit();
-       case 5:
-               ptlrpc_stop_pinger();
-       case 4:
-               ptlrpc_connection_fini();
-       case 3:
-               ptlrpc_exit_portals();
-       case 2:
-               ptlrpc_request_cache_fini();
-        case 1:
-                ptlrpc_hr_fini();
-                req_layout_fini();
-        default: ;
-        }
-
-        return rc;
+               GOTO(err_sptlrpc, rc);
+
+       RETURN(0);
+err_sptlrpc:
+       sptlrpc_fini();
+err_ldlm:
+       ldlm_exit();
+err_pinger:
+       ptlrpc_stop_pinger();
+err_conn:
+       ptlrpc_connection_fini();
+err_portals:
+       ptlrpc_exit_portals();
+err_cache:
+       ptlrpc_request_cache_fini();
+err_hr:
+       ptlrpc_hr_fini();
+err_tgt:
+       tgt_mod_exit();
+err_layout:
+       req_layout_fini();
+       return rc;
 }
 
 #ifdef __KERNEL__
 static void __exit ptlrpc_exit(void)
 {
-       tgt_mod_exit();
        ptlrpc_nrs_fini();
-        sptlrpc_fini();
-        ldlm_exit();
-        ptlrpc_stop_pinger();
-        ptlrpc_exit_portals();
+       sptlrpc_fini();
+       ldlm_exit();
+       ptlrpc_stop_pinger();
+       ptlrpc_exit_portals();
        ptlrpc_request_cache_fini();
-        ptlrpc_hr_fini();
-        ptlrpc_connection_fini();
+       ptlrpc_hr_fini();
+       ptlrpc_connection_fini();
+       tgt_mod_exit();
+       req_layout_fini();
 }
 
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
index ca2aa01..b26dd07 100644 (file)
@@ -885,12 +885,17 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
        struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
        struct ptlrpc_service_part        *svcpt = rqbd->rqbd_svcpt;
        struct ptlrpc_service             *svc = svcpt->scp_service;
-        int                                refcount;
-        cfs_list_t                        *tmp;
-        cfs_list_t                        *nxt;
+       int                                refcount;
+       cfs_list_t                        *tmp;
+       cfs_list_t                        *nxt;
 
-        if (!cfs_atomic_dec_and_test(&req->rq_refcount))
-                return;
+       if (!cfs_atomic_dec_and_test(&req->rq_refcount))
+               return;
+
+       if (req->rq_session.lc_state == LCS_ENTERED) {
+               lu_context_exit(&req->rq_session);
+               lu_context_fini(&req->rq_session);
+       }
 
        if (req->rq_at_linked) {
                spin_lock(&svcpt->scp_at_lock);
@@ -1024,11 +1029,6 @@ static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
 {
        ptlrpc_server_hpreq_fini(req);
 
-       if (req->rq_session.lc_thread != NULL) {
-               lu_context_exit(&req->rq_session);
-               lu_context_fini(&req->rq_session);
-       }
-
        ptlrpc_server_drop_request(req);
 }
 
@@ -1676,6 +1676,13 @@ static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
        if (rc < 0)
                RETURN(rc);
 
+       /* the current thread is not the processing thread for this request
+        * since that, but request is in exp_hp_list and can be find there.
+        * Remove all relations between request and old thread. */
+       req->rq_svc_thread->t_env->le_ses = NULL;
+       req->rq_svc_thread = NULL;
+       req->rq_session.lc_thread = NULL;
+
        ptlrpc_nrs_req_add(svcpt, req, !!rc);
 
        RETURN(0);
@@ -1972,7 +1979,7 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
                }
                req->rq_session.lc_thread = thread;
                lu_context_enter(&req->rq_session);
-               req->rq_svc_thread->t_env->le_ses = &req->rq_session;
+               thread->t_env->le_ses = &req->rq_session;
        }
 
        ptlrpc_at_add_timed(req);
@@ -2077,9 +2084,8 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
        /* re-assign request and sesson thread to the current one */
        request->rq_svc_thread = thread;
        if (thread != NULL) {
-               LASSERT(request->rq_session.lc_thread != NULL);
+               LASSERT(request->rq_session.lc_thread == NULL);
                request->rq_session.lc_thread = thread;
-               request->rq_session.lc_cookie = 0x55;
                thread->t_env->le_ses = &request->rq_session;
        }
        svc->srv_ops.so_req_handler(request);
@@ -2567,10 +2573,11 @@ static int ptlrpc_main(void *arg)
                        ptlrpc_start_thread(svcpt, 0);
                 }
 
+               /* reset le_ses to initial state */
+               env->le_ses = NULL;
                /* Process all incoming reqs before handling any */
                if (ptlrpc_server_request_incoming(svcpt)) {
                        lu_context_enter(&env->le_ctx);
-                       env->le_ses = NULL;
                        ptlrpc_server_handle_req_in(svcpt, thread);
                        lu_context_exit(&env->le_ctx);
 
index b0c26e0..376f345 100644 (file)
@@ -107,6 +107,9 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        } else {
                rc = PTR_ERR(obj);
        }
+
+       tsi->tsi_fid = body->fid1;
+
        RETURN(rc);
 }
 
@@ -168,12 +171,64 @@ out:
 }
 EXPORT_SYMBOL(tgt_validate_obdo);
 
+static int tgt_io_data_unpack(struct tgt_session_info *tsi, struct ost_id *oi)
+{
+       unsigned                 max_brw;
+       struct niobuf_remote    *rnb;
+       struct obd_ioobj        *ioo;
+       int                      obj_count;
+
+       ENTRY;
+
+       ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
+       if (ioo == NULL)
+               RETURN(-EPROTO);
+
+       rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE);
+       if (rnb == NULL)
+               RETURN(-EPROTO);
+
+       max_brw = ioobj_max_brw_get(ioo);
+       if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+               CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
+                      ": rc = %d\n", tgt_name(tsi->tsi_tgt),
+                      obd_export_nid2str(tsi->tsi_exp), max_brw,
+                      POSTID(oi), -EPROTO);
+               RETURN(-EPROTO);
+       }
+       ioo->ioo_oid = *oi;
+
+       obj_count = req_capsule_get_size(tsi->tsi_pill, &RMF_OBD_IOOBJ,
+                                       RCL_CLIENT) / sizeof(*ioo);
+       if (obj_count == 0) {
+               CERROR("%s: short ioobj\n", tgt_name(tsi->tsi_tgt));
+               RETURN(-EPROTO);
+       } else if (obj_count > 1) {
+               CERROR("%s: too many ioobjs (%d)\n", tgt_name(tsi->tsi_tgt),
+                      obj_count);
+               RETURN(-EPROTO);
+       }
+
+       if (ioo->ioo_bufcnt == 0) {
+               CERROR("%s: ioo has zero bufcnt\n", tgt_name(tsi->tsi_tgt));
+               RETURN(-EPROTO);
+       }
+
+       if (ioo->ioo_bufcnt > PTLRPC_MAX_BRW_PAGES) {
+               DEBUG_REQ(D_RPCTRACE, tgt_ses_req(tsi),
+                         "bulk has too many pages (%d)",
+                         ioo->ioo_bufcnt);
+               RETURN(-EPROTO);
+       }
+
+       RETURN(0);
+}
+
 static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
 {
        struct ost_body         *body;
        struct req_capsule      *pill = tsi->tsi_pill;
        struct lustre_capa      *capa;
-       struct obd_ioobj        *ioo;
        int                      rc;
 
        ENTRY;
@@ -187,7 +242,7 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
                RETURN(rc);
 
        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-               capa = req_capsule_client_get(tsi->tsi_pill, &RMF_CAPA1);
+               capa = req_capsule_client_get(pill, &RMF_CAPA1);
                if (capa == NULL) {
                        CERROR("%s: OSSCAPA flag is set without capability\n",
                               tgt_name(tsi->tsi_tgt));
@@ -198,26 +253,9 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        tsi->tsi_ost_body = body;
 
        if (req_capsule_has_field(pill, &RMF_OBD_IOOBJ, RCL_CLIENT)) {
-               unsigned                 max_brw;
-               struct niobuf_remote    *rnb;
-
-               ioo = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
-               if (ioo == NULL)
-                       RETURN(-EPROTO);
-
-               rnb = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
-               if (rnb == NULL)
-                       RETURN(-EPROTO);
-
-               max_brw = ioobj_max_brw_get(ioo);
-               if (unlikely((max_brw & (max_brw - 1)) != 0)) {
-                       CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
-                              ": rc = %d\n", tgt_name(tsi->tsi_tgt),
-                              obd_export_nid2str(tsi->tsi_exp), max_brw,
-                              POSTID(&body->oa.o_oi), -EPROTO);
-                       RETURN(-EPROTO);
-               }
-               ioo->ioo_oid = body->oa.o_oi;
+               rc = tgt_io_data_unpack(tsi, &body->oa.o_oi);
+               if (rc < 0)
+                       RETURN(rc);
        }
 
        if (!(body->oa.o_valid & OBD_MD_FLID)) {
@@ -251,32 +289,72 @@ static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        RETURN(rc);
 }
 
-static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
+/*
+ * Do necessary preprocessing according to handler ->th_flags.
+ */
+static int tgt_request_preprocess(struct tgt_session_info *tsi,
+                                 struct tgt_handler *h,
+                                 struct ptlrpc_request *req)
 {
        struct req_capsule      *pill = tsi->tsi_pill;
-       int                      rc;
+       __u32                    flags = h->th_flags;
+       int                      rc = 0;
 
        ENTRY;
 
-       if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
-               rc = tgt_mdt_body_unpack(tsi, flags);
-       } else if (req_capsule_has_field(pill, &RMF_OST_BODY, RCL_CLIENT)) {
-               rc = tgt_ost_body_unpack(tsi, flags);
-       } else {
-               rc = 0;
+       if (tsi->tsi_preprocessed)
+               RETURN(0);
+
+       LASSERT(h->th_act != NULL);
+       LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
+       LASSERT(current->journal_info == NULL);
+
+       LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
+                    h->th_fmt != NULL));
+       if (h->th_fmt != NULL) {
+               req_capsule_set(pill, h->th_fmt);
+               if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
+                       rc = tgt_mdt_body_unpack(tsi, flags);
+                       if (rc < 0)
+                               RETURN(rc);
+               } else if (req_capsule_has_field(pill, &RMF_OST_BODY,
+                                                RCL_CLIENT)) {
+                       rc = tgt_ost_body_unpack(tsi, flags);
+                       if (rc < 0)
+                               RETURN(rc);
+               }
        }
 
-       if (rc == 0 && flags & HABEO_REFERO) {
-               /* Pack reply */
-               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
-                       req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
-                                            tsi->tsi_mdt_body->eadatasize);
-               if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
-                       req_capsule_set_size(pill, &RMF_LOGCOOKIES,
-                                            RCL_SERVER, 0);
+       if (flags & MUTABOR && tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
+               RETURN(-EROFS);
+
+       if (flags & HABEO_CLAVIS) {
+               struct ldlm_request *dlm_req;
 
-               rc = req_capsule_server_pack(pill);
+               LASSERT(h->th_fmt != NULL);
+
+               dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ);
+               if (dlm_req != NULL) {
+                       if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
+                                    LDLM_IBITS &&
+                                    dlm_req->lock_desc.l_policy_data.\
+                                    l_inodebits.bits == 0)) {
+                               /*
+                                * Lock without inodebits makes no sense and
+                                * will oops later in ldlm. If client miss to
+                                * set such bits, do not trigger ASSERTION.
+                                *
+                                * For liblustre flock case, it maybe zero.
+                                */
+                               rc = -EPROTO;
+                       } else {
+                               tsi->tsi_dlm_req = dlm_req;
+                       }
+               } else {
+                       rc = -EFAULT;
+               }
        }
+       tsi->tsi_preprocessed = 1;
        RETURN(rc);
 }
 
@@ -291,14 +369,9 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
 {
        int      serious = 0;
        int      rc;
-       __u32    flags;
 
        ENTRY;
 
-       LASSERT(h->th_act != NULL);
-       LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
-       LASSERT(current->journal_info == NULL);
-
        /*
         * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
         * to put same checks into handlers like mdt_close(), mdt_reint(),
@@ -313,44 +386,21 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
        if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE))
                RETURN(0);
 
-       rc = 0;
-       flags = h->th_flags;
-       LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
-                    h->th_fmt != NULL));
-       if (h->th_fmt != NULL) {
-               req_capsule_set(tsi->tsi_pill, h->th_fmt);
-               rc = tgt_unpack_req_pack_rep(tsi, flags);
-       }
-
-       if (rc == 0 && flags & MUTABOR &&
-           tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
-               rc = -EROFS;
-
-       if (rc == 0 && flags & HABEO_CLAVIS) {
-               struct ldlm_request *dlm_req;
-
-               LASSERT(h->th_fmt != NULL);
+       rc = tgt_request_preprocess(tsi, h, req);
+       /* pack reply if reply format is fixed */
+       if (rc == 0 && h->th_flags & HABEO_REFERO) {
+               /* Pack reply */
+               if (req_capsule_has_field(tsi->tsi_pill, &RMF_MDT_MD,
+                                         RCL_SERVER))
+                       req_capsule_set_size(tsi->tsi_pill, &RMF_MDT_MD,
+                                            RCL_SERVER,
+                                            tsi->tsi_mdt_body->eadatasize);
+               if (req_capsule_has_field(tsi->tsi_pill, &RMF_LOGCOOKIES,
+                                         RCL_SERVER))
+                       req_capsule_set_size(tsi->tsi_pill, &RMF_LOGCOOKIES,
+                                            RCL_SERVER, 0);
 
-               dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
-               if (dlm_req != NULL) {
-                       if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
-                                    LDLM_IBITS &&
-                                    dlm_req->lock_desc.l_policy_data.\
-                                    l_inodebits.bits == 0)) {
-                               /*
-                                * Lock without inodebits makes no sense and
-                                * will oops later in ldlm. If client miss to
-                                * set such bits, do not trigger ASSERTION.
-                                *
-                                * For liblustre flock case, it maybe zero.
-                                */
-                               rc = -EPROTO;
-                       } else {
-                               tsi->tsi_dlm_req = dlm_req;
-                       }
-               } else {
-                       rc = -EFAULT;
-               }
+               rc = req_capsule_server_pack(tsi->tsi_pill);
        }
 
        if (likely(rc == 0)) {
@@ -488,13 +538,45 @@ int tgt_handle_recovery(struct ptlrpc_request *req, int reply_fail_id)
        RETURN(+1);
 }
 
+/* Initial check for request, it is validation mostly */
+static struct tgt_handler *tgt_handler_find_check(struct ptlrpc_request *req)
+{
+       struct tgt_handler      *h;
+       struct tgt_opc_slice    *s;
+       struct lu_target        *tgt;
+       __u32                    opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+       ENTRY;
+
+       tgt = class_exp2tgt(req->rq_export);
+
+       for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
+               if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
+                       break;
+
+       /* opcode was not found in slice */
+       if (unlikely(s->tos_hs == NULL)) {
+               CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt),
+                      opc);
+               RETURN(ERR_PTR(-ENOTSUPP));
+       }
+
+       LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
+       h = s->tos_hs + (opc - s->tos_opc_start);
+       if (unlikely(h->th_opc == 0)) {
+               CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
+               RETURN(ERR_PTR(-ENOTSUPP));
+       }
+
+       RETURN(h);
+}
+
 int tgt_request_handle(struct ptlrpc_request *req)
 {
        struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
 
        struct lustre_msg       *msg = req->rq_reqmsg;
        struct tgt_handler      *h;
-       struct tgt_opc_slice    *s;
        struct lu_target        *tgt;
        int                      request_fail_id = 0;
        __u32                    opc = lustre_msg_get_opc(msg);
@@ -543,14 +625,9 @@ int tgt_request_handle(struct ptlrpc_request *req)
        request_fail_id = tgt->lut_request_fail_id;
        tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
 
-       for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
-               if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
-                       break;
-
-       /* opcode was not found in slice */
-       if (unlikely(s->tos_hs == NULL)) {
-               CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
-               req->rq_status = -ENOTSUPP;
+       h = tgt_handler_find_check(req);
+       if (IS_ERR(h)) {
+               req->rq_status = PTR_ERR(h);
                rc = ptlrpc_error(req);
                GOTO(out, rc);
        }
@@ -558,17 +635,6 @@ int tgt_request_handle(struct ptlrpc_request *req)
        if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
                GOTO(out, rc = 0);
 
-       LASSERT(current->journal_info == NULL);
-
-       LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
-       h = s->tos_hs + (opc - s->tos_opc_start);
-       if (unlikely(h->th_opc == 0)) {
-               CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
-               req->rq_status = -ENOTSUPP;
-               rc = ptlrpc_error(req);
-               GOTO(out, rc);
-       }
-
        rc = lustre_msg_check_version(msg, h->th_version);
        if (unlikely(rc)) {
                DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
@@ -590,20 +656,48 @@ int tgt_request_handle(struct ptlrpc_request *req)
        EXIT;
 out:
        req_capsule_fini(tsi->tsi_pill);
-       tsi->tsi_pill = NULL;
        if (tsi->tsi_corpus != NULL) {
                lu_object_put(tsi->tsi_env, tsi->tsi_corpus);
                tsi->tsi_corpus = NULL;
        }
-       tsi->tsi_env = NULL;
-       tsi->tsi_mdt_body = NULL;
-       tsi->tsi_dlm_req = NULL;
-       fid_zero(&tsi->tsi_fid);
-       memset(&tsi->tsi_resid, 0, sizeof tsi->tsi_resid);
        return rc;
 }
 EXPORT_SYMBOL(tgt_request_handle);
 
+/** Assign high priority operations to the request if needed. */
+int tgt_hpreq_handler(struct ptlrpc_request *req)
+{
+       struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+       struct tgt_handler      *h;
+       int                      rc;
+
+       ENTRY;
+
+       if (req->rq_export == NULL)
+               RETURN(0);
+
+       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+       tsi->tsi_pill = &req->rq_pill;
+       tsi->tsi_env = req->rq_svc_thread->t_env;
+       tsi->tsi_tgt = class_exp2tgt(req->rq_export);
+       tsi->tsi_exp = req->rq_export;
+
+       h = tgt_handler_find_check(req);
+       if (IS_ERR(h)) {
+               rc = PTR_ERR(h);
+               RETURN(rc);
+       }
+
+       rc = tgt_request_preprocess(tsi, h, req);
+       if (unlikely(rc != 0))
+               RETURN(rc);
+
+       if (h->th_hp != NULL)
+               h->th_hp(tsi);
+       RETURN(0);
+}
+EXPORT_SYMBOL(tgt_hpreq_handler);
+
 void tgt_counter_incr(struct obd_export *exp, int opcode)
 {
        lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
@@ -1498,11 +1592,11 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
        /* Check if there is eviction in progress, and if so, wait for it to
         * finish */
-       if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+       if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
                /* We do not care how long it takes */
                lwi = LWI_INTR(NULL, NULL);
                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
-                        !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
+                        !atomic_read(&exp->exp_obd->obd_evict_inprogress),
                         &lwi);
        }