Whamcloud - gitweb
LU-1431 ptlrpc: Support for over 1MB bulk I/O RPC
[fs/lustre-release.git] / lustre / ptlrpc / client.c
index 819276c..4dea902 100644 (file)
@@ -98,26 +98,34 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
 EXPORT_SYMBOL(ptlrpc_uuid_to_connection);
 
 /**
- * Allocate and initialize new bulk descriptor
+ * Allocate and initialize new bulk descriptor on the sender.
  * Returns pointer to the descriptor or NULL on error.
  */
-struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
+struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
+                                        unsigned type, unsigned portal)
 {
-        struct ptlrpc_bulk_desc *desc;
+       struct ptlrpc_bulk_desc *desc;
+       int i;
 
-        OBD_ALLOC(desc, offsetof (struct ptlrpc_bulk_desc, bd_iov[npages]));
-        if (!desc)
-                return NULL;
+       OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[npages]));
+       if (!desc)
+               return NULL;
 
        spin_lock_init(&desc->bd_lock);
-        cfs_waitq_init(&desc->bd_waitq);
-        desc->bd_max_iov = npages;
-        desc->bd_iov_count = 0;
-        LNetInvalidateHandle(&desc->bd_md_h);
-        desc->bd_portal = portal;
-        desc->bd_type = type;
-
-        return desc;
+       cfs_waitq_init(&desc->bd_waitq);
+       desc->bd_max_iov = npages;
+       desc->bd_iov_count = 0;
+       desc->bd_portal = portal;
+       desc->bd_type = type;
+       desc->bd_md_count = 0;
+       LASSERT(max_brw > 0);
+       desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
+       /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
+        * node. Negotiated ocd_brw_size will always be <= this number. */
+       for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
+               LNetInvalidateHandle(&desc->bd_mds[i]);
+
+       return desc;
 }
 
 /**
@@ -128,16 +136,17 @@ struct ptlrpc_bulk_desc *new_bulk(int npages, int type, int portal)
  * error.
  */
 struct ptlrpc_bulk_desc *ptlrpc_prep_bulk_imp(struct ptlrpc_request *req,
-                                              int npages, int type, int portal)
+                                             unsigned npages, unsigned max_brw,
+                                             unsigned type, unsigned portal)
 {
-        struct obd_import *imp = req->rq_import;
-        struct ptlrpc_bulk_desc *desc;
+       struct obd_import *imp = req->rq_import;
+       struct ptlrpc_bulk_desc *desc;
 
-        ENTRY;
-        LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
-        desc = new_bulk(npages, type, portal);
-        if (desc == NULL)
-                RETURN(NULL);
+       ENTRY;
+       LASSERT(type == BULK_PUT_SINK || type == BULK_GET_SOURCE);
+       desc = ptlrpc_new_bulk(npages, max_brw, type, portal);
+       if (desc == NULL)
+               RETURN(NULL);
 
         desc->bd_import_generation = req->rq_import_generation;
         desc->bd_import = class_import_get(imp);
@@ -182,29 +191,29 @@ EXPORT_SYMBOL(__ptlrpc_prep_bulk_page);
  */
 void __ptlrpc_free_bulk(struct ptlrpc_bulk_desc *desc, int unpin)
 {
-        int i;
-        ENTRY;
+       int i;
+       ENTRY;
 
-        LASSERT(desc != NULL);
-        LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
-        LASSERT(!desc->bd_network_rw);         /* network hands off or */
-        LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
+       LASSERT(desc != NULL);
+       LASSERT(desc->bd_iov_count != LI_POISON); /* not freed already */
+       LASSERT(desc->bd_md_count == 0);         /* network hands off */
+       LASSERT((desc->bd_export != NULL) ^ (desc->bd_import != NULL));
 
-        sptlrpc_enc_pool_put_pages(desc);
+       sptlrpc_enc_pool_put_pages(desc);
 
-        if (desc->bd_export)
-                class_export_put(desc->bd_export);
-        else
-                class_import_put(desc->bd_import);
+       if (desc->bd_export)
+               class_export_put(desc->bd_export);
+       else
+               class_import_put(desc->bd_import);
 
        if (unpin) {
                for (i = 0; i < desc->bd_iov_count ; i++)
                        cfs_page_unpin(desc->bd_iov[i].kiov_page);
        }
 
-        OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
-                                bd_iov[desc->bd_max_iov]));
-        EXIT;
+       OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc,
+                               bd_iov[desc->bd_max_iov]));
+       EXIT;
 }
 EXPORT_SYMBOL(__ptlrpc_free_bulk);
 
@@ -1752,14 +1761,14 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
                 if (ptlrpc_client_bulk_active(req))
                         continue;
 
-                if (!req->rq_bulk->bd_success) {
-                        /* The RPC reply arrived OK, but the bulk screwed
-                         * up!  Dead weird since the server told us the RPC
-                         * was good after getting the REPLY for her GET or
-                         * the ACK for her PUT. */
-                        DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
-                        req->rq_status = -EIO;
-                }
+               if (req->rq_bulk->bd_failure) {
+                       /* The RPC reply arrived OK, but the bulk screwed
+                        * up!  Dead weird since the server told us the RPC
+                        * was good after getting the REPLY for her GET or
+                        * the ACK for her PUT. */
+                       DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
+                       req->rq_status = -EIO;
+               }
 
                 ptlrpc_rqphase_move(req, RQ_PHASE_INTERPRET);
 
@@ -2856,28 +2865,44 @@ static spinlock_t ptlrpc_last_xid_lock;
 #define YEAR_2004 (1ULL << 30)
 void ptlrpc_init_xid(void)
 {
-        time_t now = cfs_time_current_sec();
+       time_t now = cfs_time_current_sec();
 
        spin_lock_init(&ptlrpc_last_xid_lock);
-        if (now < YEAR_2004) {
-                cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
-                ptlrpc_last_xid >>= 2;
-                ptlrpc_last_xid |= (1ULL << 61);
-        } else {
-                ptlrpc_last_xid = (__u64)now << 20;
-        }
+       if (now < YEAR_2004) {
+               cfs_get_random_bytes(&ptlrpc_last_xid, sizeof(ptlrpc_last_xid));
+               ptlrpc_last_xid >>= 2;
+               ptlrpc_last_xid |= (1ULL << 61);
+       } else {
+               ptlrpc_last_xid = (__u64)now << 20;
+       }
+
+       /* Need to always be aligned to a power-of-two for mutli-bulk BRW */
+       CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
+       ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
 }
 
 /**
- * Increase xid and returns resultng new value to the caller.
+ * Increase xid and returns resulting new value to the caller.
+ *
+ * Multi-bulk BRW RPCs consume multiple XIDs for each bulk transfer, starting
+ * at the returned xid, up to xid + PTLRPC_BULK_OPS_COUNT - 1. The BRW RPC
+ * itself uses the last bulk xid needed, so the server can determine the
+ * the number of bulk transfers from the RPC XID and a bitmask.  The starting
+ * xid must align to a power-of-two value.
+ *
+ * This is assumed to be true due to the initial ptlrpc_last_xid
+ * value also being initialized to a power-of-two value. LU-1431
  */
 __u64 ptlrpc_next_xid(void)
 {
-       __u64 tmp;
+       __u64 next;
+
        spin_lock(&ptlrpc_last_xid_lock);
-       tmp = ++ptlrpc_last_xid;
+       next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
+       ptlrpc_last_xid = next;
        spin_unlock(&ptlrpc_last_xid_lock);
-       return tmp;
+
+       return next;
 }
 EXPORT_SYMBOL(ptlrpc_next_xid);
 
@@ -2889,14 +2914,16 @@ __u64 ptlrpc_sample_next_xid(void)
 {
 #if BITS_PER_LONG == 32
        /* need to avoid possible word tearing on 32-bit systems */
-       __u64 tmp;
+       __u64 next;
+
        spin_lock(&ptlrpc_last_xid_lock);
-       tmp = ptlrpc_last_xid + 1;
+       next = ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
        spin_unlock(&ptlrpc_last_xid_lock);
-       return tmp;
+
+       return next;
 #else
        /* No need to lock, since returned value is racy anyways */
-       return ptlrpc_last_xid + 1;
+       return ptlrpc_last_xid + PTLRPC_BULK_OPS_COUNT;
 #endif
 }
 EXPORT_SYMBOL(ptlrpc_sample_next_xid);