Whamcloud - gitweb
Branch HEAD
authorvitaly <vitaly>
Mon, 21 Jan 2008 12:45:09 +0000 (12:45 +0000)
committervitaly <vitaly>
Mon, 21 Jan 2008 12:45:09 +0000 (12:45 +0000)
b=14379
i=green
i=shadow

Check the available space for lock handles in all the ELC rpc, send extra CANCEL rpc if there is no enough space.
Cancel AGED lru locks in all ELC rpc, not only in ENQUEUE & CANCEL.

lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/mdc/mdc_reint.c
lustre/osc/osc_request.c

index fc9f390..416dc60 100644 (file)
@@ -729,6 +729,10 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
                                              int bufcount, int *size,
                                              struct list_head *head, int count);
+struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
+                                         int opc, int bufcount, int *size,
+                                         int bufoff, int canceloff,
+                                         struct list_head *cancels, int count);
 int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
                          const struct ldlm_request *dlm_req,
                          const struct ldlm_callback_suite *cbs);
index 9aac8b0..d67362f 100644 (file)
@@ -2155,6 +2155,7 @@ EXPORT_SYMBOL(ldlm_blocking_ast);
 EXPORT_SYMBOL(ldlm_glimpse_ast);
 EXPORT_SYMBOL(ldlm_expired_completion_wait);
 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
+EXPORT_SYMBOL(ldlm_prep_elc_req);
 EXPORT_SYMBOL(ldlm_cli_convert);
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
index 889dac7..adcc5d7 100644 (file)
@@ -486,17 +486,18 @@ cleanup:
  * a single page on the send/receive side. XXX: 512 should be changed
  * to more adequate value. */
 static inline int ldlm_req_handles_avail(struct obd_export *exp,
-                                         int *size, int bufcount, int off)
+                                         int *size, int bufcount,
+                                         int bufoff, int off)
 {
         int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
-        int old_size = size[DLM_LOCKREQ_OFF];
+        int old_size = size[bufoff];
 
-        size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+        size[bufoff] = sizeof(struct ldlm_request);
         avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
                                  bufcount, size);
         avail /= sizeof(struct lustre_handle);
         avail += LDLM_LOCKREQ_HANDLES - off;
-        size[DLM_LOCKREQ_OFF] = old_size;
+        size[bufoff] = old_size;
 
         return avail;
 }
@@ -505,17 +506,18 @@ static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
 {
         int size[2] = { sizeof(struct ptlrpc_body),
                         sizeof(struct ldlm_request) };
-        return ldlm_req_handles_avail(exp, size, 2, 0);
+        return ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
 }
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
  * @count locks in @cancels. */
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
-                                             int bufcount, int *size,
-                                             struct list_head *cancels,
-                                             int count)
+struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
+                                         int opc, int bufcount, int *size,
+                                         int bufoff, int canceloff,
+                                         struct list_head *cancels, int count)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+        int flags, avail, to_free, pack = 0;
         struct ldlm_request *dlm = NULL;
         struct ptlrpc_request *req;
         CFS_LIST_HEAD(head);
@@ -525,41 +527,57 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
                 cancels = &head;
         if (exp_connect_cancelset(exp)) {
                 /* Estimate the amount of available space in the request. */
-                int avail = ldlm_req_handles_avail(exp, size, bufcount,
-                                                   LDLM_ENQUEUE_CANCEL_OFF);
-                int flags, cancel;
-
-                LASSERT(avail >= count);
-
+                avail = ldlm_req_handles_avail(exp, size, bufcount,
+                                               bufoff, canceloff);
                 flags = ns_connect_lru_resize(ns) ? 
                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
-                cancel = ns_connect_lru_resize(ns) ? 0 : 1;
+                to_free = !ns_connect_lru_resize(ns) &&
+                          opc == LDLM_ENQUEUE ? 1 : 0;
 
                 /* Cancel lru locks here _only_ if the server supports 
                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
-                 * rpc right on enqueue, what will make it slower, vs. 
-                 * asynchronous rpc in blocking thread. */
-                count += ldlm_cancel_lru_local(ns, cancels, cancel,
-                                               avail - count, 0, flags);
-                size[DLM_LOCKREQ_OFF] =
-                        ldlm_request_bufsize(count, LDLM_ENQUEUE);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                              LDLM_ENQUEUE, bufcount, size, NULL);
+                 * rpc, what will make us slower. */
+                if (avail > count)
+                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
+                                                       avail - count, 0, flags);
+                if (avail > count)
+                        pack = count;
+                else
+                        pack = avail;
+                size[bufoff] = ldlm_request_bufsize(pack, opc);
+        }
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
+                              opc, bufcount, size, NULL);
         if (exp_connect_cancelset(exp) && req) {
-                dlm = lustre_msg_buf(req->rq_reqmsg,
-                                     DLM_LOCKREQ_OFF, sizeof(*dlm));
-                /* Skip first lock handler in ldlm_request_pack(), this method
-                 * will incrment @lock_count according to the lock handle amount
-                 * actually written to the buffer. */
-                dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF;
-                ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF, 0);
+                if (canceloff) {
+                        dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
+                                             sizeof(*dlm));
+                        /* Skip first lock handler in ldlm_request_pack(),
+                         * this method will incrment @lock_count according
+                         * to the lock handle amount actually written to
+                         * the buffer. */
+                        dlm->lock_count = canceloff;
+                }
+                /* Pack into the request @pack lock handles. */
+                ldlm_cli_cancel_list(cancels, pack, req, bufoff, 0);
+                /* Prepare and send separate cancel rpc for others. */
+                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0, 0);
         } else {
                 ldlm_lock_list_put(cancels, l_bl_ast, count);
         }
         RETURN(req);
 }
 
+struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
+                                             int bufcount, int *size,
+                                             struct list_head *cancels,
+                                             int count)
+{
+        return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
+                                 bufcount, size, DLM_LOCKREQ_OFF,
+                                 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
+}
+
 /* If a request has some specific initialisation it is passed in @reqp,
  * otherwise it is created in ldlm_cli_enqueue.
  *
@@ -898,7 +916,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
                 RETURN(count);
 
-        free = ldlm_req_handles_avail(exp, size, 2, 0);
+        free = ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
         if (count > free)
                 count = free;
 
@@ -1048,13 +1066,16 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
          * here and send them all as one LDLM_CANCEL rpc. */
         LASSERT(list_empty(&lock->l_bl_ast));
         list_add(&lock->l_bl_ast, &cancels);
-        avail = ldlm_cancel_handles_avail(lock->l_conn_export);
-        LASSERT(avail > 0);
+        if (exp_connect_cancelset(lock->l_conn_export)) {
+                avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+                LASSERT(avail > 0);
 
-        ns = lock->l_resource->lr_namespace;
-        flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
-        count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - count,
-                                       LDLM_FL_BL_AST, flags);
+                ns = lock->l_resource->lr_namespace;
+                flags = ns_connect_lru_resize(ns) ?
+                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
+                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
+                                               LDLM_FL_BL_AST, flags);
+        }
         ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
         RETURN(0);
 }
@@ -1474,7 +1495,6 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                 count -= res;
                 ldlm_lock_list_put(cancels, l_bl_ast, res);
         }
-        LASSERT(list_empty(cancels));
         LASSERT(count == 0);
         RETURN(0);
 }
index 0021b22..ee27032 100644 (file)
@@ -90,6 +90,14 @@ int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
         RETURN(count);
 }
 
+struct ptlrpc_request *mdc_prep_elc_req(struct obd_export *exp,
+                                        int bufcount, int *size, int off,
+                                        struct list_head *cancels, int count)
+{
+        return ldlm_prep_elc_req(exp, LUSTRE_MDS_VERSION, MDS_REINT,
+                                 bufcount, size, off, 0, cancels, count);
+}
+
 /* If mdc_setattr is called with an 'iattr', then it is a normal RPC that
  * should take the normal semaphore and go to the normal portal.
  *
@@ -132,16 +140,10 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                 &cancels, LCK_EX, bits);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 7;
-                size[REQ_REC_OFF + 5] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 5, 0);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 5, &cancels, count);
 
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -235,16 +237,10 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                 &cancels, LCK_EX,
                                                 MDS_INODELOCK_UPDATE);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 6;
-                size[REQ_REC_OFF + 4] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 4, 0);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 4, &cancels, count);
 
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -316,16 +312,11 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_FULL);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 5;
-                size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3, 0);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+        
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 3, &cancels, count);
 
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -371,17 +362,10 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_UPDATE);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 6;
-                size[REQ_REC_OFF + 4] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 4, 0);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 4, &cancels, count);
 
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -437,16 +421,10 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid4,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_FULL);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 7;
-                size[REQ_REC_OFF + 5] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 5, 0);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 5, &cancels, count);
 
         if (req == NULL)
                 RETURN(-ENOMEM);
index 87e3867..fbbc634 100644 (file)
@@ -622,18 +622,10 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
 
         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
                                         LDLM_FL_DISCARD_DATA);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 3;
-                size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,
-                                                             OST_DESTROY);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_DESTROY, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
-
+        req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
+                                size, REQ_REC_OFF + 1, 0, &cancels, count);
         if (!req)
                 RETURN(-ENOMEM);