Whamcloud - gitweb
LU-13004 ptlrpc: Allow BULK_BUF_KIOV to accept a kvec
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 300ec30..85bb30d 100644 (file)
@@ -37,6 +37,7 @@
 #include <linux/delay.h>
 #include <linux/random.h>
 
+#include <lnet/lib-lnet.h>
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_lib.h>
@@ -68,6 +69,28 @@ static void ptlrpc_release_bulk_page_pin(struct ptlrpc_bulk_desc *desc)
                put_page(BD_GET_KIOV(desc, i).kiov_page);
 }
 
+static int ptlrpc_prep_bulk_frag_pages(struct ptlrpc_bulk_desc *desc,
+                                      void *frag, int len)
+{
+       unsigned int offset = (uintptr_t)frag & ~PAGE_MASK;
+
+       ENTRY;
+       while (len > 0) {
+               int page_len = min_t(unsigned int, PAGE_SIZE - offset,
+                                    len);
+               uintptr_t vaddr = (uintptr_t) frag;
+
+               ptlrpc_prep_bulk_page_nopin(desc,
+                                           lnet_kvaddr_to_page(vaddr),
+                                           offset, page_len);
+               offset = 0;
+               len -= page_len;
+               frag += page_len;
+       }
+
+       RETURN(desc->bd_nob);
+}
+
 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops = {
        .add_kiov_frag  = ptlrpc_prep_bulk_page_pin,
        .release_frags  = ptlrpc_release_bulk_page_pin,
@@ -77,6 +100,7 @@ EXPORT_SYMBOL(ptlrpc_bulk_kiov_pin_ops);
 const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
        .add_kiov_frag  = ptlrpc_prep_bulk_page_nopin,
        .release_frags  = ptlrpc_release_bulk_noop,
+       .add_iov_frag   = ptlrpc_prep_bulk_frag_pages,
 };
 EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
 
@@ -624,7 +648,7 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize,
 {
        struct ptlrpc_request_pool *pool;
 
-       OBD_ALLOC(pool, sizeof(struct ptlrpc_request_pool));
+       OBD_ALLOC_PTR(pool);
        if (!pool)
                return NULL;
 
@@ -737,6 +761,41 @@ static inline void ptlrpc_assign_next_xid(struct ptlrpc_request *req)
 
 static atomic64_t ptlrpc_last_xid;
 
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_import->imp_lock);
+       list_del_init(&req->rq_unreplied_list);
+       ptlrpc_assign_next_xid_nolock(req);
+       spin_unlock(&req->rq_import->imp_lock);
+       DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
+}
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       __u32 opc;
+       __u16 tag;
+
+       opc = lustre_msg_get_opc(req->rq_reqmsg);
+       tag = obd_get_mod_rpc_slot(cli, opc);
+       lustre_msg_set_tag(req->rq_reqmsg, tag);
+       ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+       __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+       if (tag != 0) {
+               struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+               __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+               obd_put_mod_rpc_slot(cli, opc, tag);
+       }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
+
 int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
                             __u32 version, int opcode, char **bufs,
                             struct ptlrpc_cli_ctx *ctx)
@@ -894,7 +953,6 @@ ptlrpc_request_alloc_internal(struct obd_import *imp,
                              const struct req_format *format)
 {
        struct ptlrpc_request *request;
-       int connect = 0;
 
        request = __ptlrpc_request_alloc(imp, pool);
        if (!request)
@@ -915,17 +973,17 @@ ptlrpc_request_alloc_internal(struct obd_import *imp,
                if (imp->imp_state == LUSTRE_IMP_IDLE) {
                        imp->imp_generation++;
                        imp->imp_initiated_at = imp->imp_generation;
-                       imp->imp_state =  LUSTRE_IMP_NEW;
-                       connect = 1;
-               }
-               spin_unlock(&imp->imp_lock);
-               if (connect) {
-                       rc = ptlrpc_connect_import(imp);
+                       imp->imp_state = LUSTRE_IMP_NEW;
+
+                       /* connect_import_locked releases imp_lock */
+                       rc = ptlrpc_connect_import_locked(imp);
                        if (rc < 0) {
                                ptlrpc_request_free(request);
                                return NULL;
                        }
                        ptlrpc_pinger_add_import(imp);
+               } else {
+                       spin_unlock(&imp->imp_lock);
                }
        }
 
@@ -3336,7 +3394,8 @@ void ptlrpc_init_xid(void)
        }
 
        /* Need to always be aligned to a power-of-two for mutli-bulk BRW */
-       CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
+       BUILD_BUG_ON((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) !=
+                    0);
        xid &= PTLRPC_BULK_OPS_MASK;
        atomic64_set(&ptlrpc_last_xid, xid);
 }