Whamcloud - gitweb
Branch b1_6
authorvitaly <vitaly>
Mon, 21 Jan 2008 12:37:45 +0000 (12:37 +0000)
committervitaly <vitaly>
Mon, 21 Jan 2008 12:37:45 +0000 (12:37 +0000)
b=14379
i=green
i=shadow

Check the available space for lock handles in all the ELC rpc, send extra rpc if there is no enough space.
Cancel AGED lru locks in all the ELC rpc, not in enqueue & cancel only.

lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/mdc/mdc_reint.c
lustre/osc/osc_request.c

index a3adc04..78b4a13 100644 (file)
@@ -740,6 +740,10 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **req,
 struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
                                              int bufcount, int *size,
                                              struct list_head *head, int count);
+struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
+                                         int opc, int bufcount, int *size,
+                                         int bufoff, int canceloff,
+                                         struct list_head *cancels, int count);
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
                           int *flags, void *lvb, __u32 lvb_len,
@@ -770,7 +774,6 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
                                int lock_flags, int cancel_flags, void *opaque);
 int ldlm_cli_cancel_list(struct list_head *head, int count,
                          struct ptlrpc_request *req, int off);
-
 /* mds/handler.c */
 /* This has to be here because recursive inclusion sucks. */
 int intent_disposition(struct ldlm_reply *rep, int flag);
index 8e57759..924fd4b 100644 (file)
@@ -2075,6 +2075,7 @@ EXPORT_SYMBOL(ldlm_blocking_ast);
 EXPORT_SYMBOL(ldlm_glimpse_ast);
 EXPORT_SYMBOL(ldlm_expired_completion_wait);
 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
+EXPORT_SYMBOL(ldlm_prep_elc_req);
 EXPORT_SYMBOL(ldlm_cli_convert);
 EXPORT_SYMBOL(ldlm_cli_enqueue);
 EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
index 699af94..fa1d4c9 100644 (file)
@@ -511,17 +511,18 @@ cleanup:
  * a single page on the send/receive side. XXX: 512 should be changed
  * to more adequate value. */
 static inline int ldlm_req_handles_avail(struct obd_export *exp,
-                                         int *size, int bufcount, int off)
+                                         int *size, int bufcount,
+                                         int bufoff, int off)
 {
         int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
-        int old_size = size[DLM_LOCKREQ_OFF];
+        int old_size = size[bufoff];
 
-        size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+        size[bufoff] = sizeof(struct ldlm_request);
         avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
                                  bufcount, size);
         avail /= sizeof(struct lustre_handle);
         avail += LDLM_LOCKREQ_HANDLES - off;
-        size[DLM_LOCKREQ_OFF] = old_size;
+        size[bufoff] = old_size;
 
         return avail;
 }
@@ -530,61 +531,78 @@ static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
 {
         int size[2] = { sizeof(struct ptlrpc_body),
                         sizeof(struct ldlm_request) };
-        return ldlm_req_handles_avail(exp, size, 2, 0);
+        return ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
 }
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
  * @count locks in @cancels. */
-struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
-                                             int bufcount, int *size,
-                                             struct list_head *cancels,
-                                             int count)
+struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,
+                                         int opc, int bufcount, int *size,
+                                         int bufoff, int canceloff,
+                                         struct list_head *cancels, int count)
 {
         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+        int flags, avail, to_free, pack = 0;
         struct ldlm_request *dlm = NULL;
         struct ptlrpc_request *req;
         CFS_LIST_HEAD(head);
         ENTRY;
-        
+
         if (cancels == NULL)
                 cancels = &head;
         if (exp_connect_cancelset(exp)) {
                 /* Estimate the amount of free space in the request. */
-                int avail = ldlm_req_handles_avail(exp, size, bufcount,
-                                                   LDLM_ENQUEUE_CANCEL_OFF);
-                int flags, to_free;
-                
-                LASSERT(avail >= count);
-
+                avail = ldlm_req_handles_avail(exp, size, bufcount,
+                                               bufoff, canceloff);
                 flags = ns_connect_lru_resize(ns) ? 
                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
-                to_free = ns_connect_lru_resize(ns) ? 0 : 1;
+                to_free = !ns_connect_lru_resize(ns) &&
+                          opc == LDLM_ENQUEUE ? 1 : 0;
 
                 /* Cancel lru locks here _only_ if the server supports 
                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
-                 * rpc right on enqueue, what will make it slower, vs. 
-                 * asynchronous rpc in blocking thread. */
-                count += ldlm_cancel_lru_local(ns, cancels, to_free,
-                                               avail - count, 0, flags);
-                size[DLM_LOCKREQ_OFF] =
-                        ldlm_request_bufsize(count, LDLM_ENQUEUE);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                              LDLM_ENQUEUE, bufcount, size, NULL);
+                 * rpc, what will make us slower. */
+                if (avail > count)
+                        count += ldlm_cancel_lru_local(ns, cancels, to_free,
+                                                       avail - count, 0, flags);
+                if (avail > count)
+                        pack = count;
+                else
+                        pack = avail;
+                size[bufoff] = ldlm_request_bufsize(pack, opc);
+        }
+        req = ptlrpc_prep_req(class_exp2cliimp(exp), version,
+                              opc, bufcount, size, NULL);
         if (exp_connect_cancelset(exp) && req) {
-                dlm = lustre_msg_buf(req->rq_reqmsg,
-                                     DLM_LOCKREQ_OFF, sizeof(*dlm));
-                /* Skip first lock handler in ldlm_request_pack(), this method
-                 * will incrment @lock_count according to the lock handle amount
-                 * actually written to the buffer. */
-                dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF;
-                ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF);
+                if (canceloff) {
+                        dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,
+                                             sizeof(*dlm));
+                        /* Skip first lock handler in ldlm_request_pack(),
+                         * this method will incrment @lock_count according
+                         * to the lock handle amount actually written to
+                         * the buffer. */
+                        dlm->lock_count = canceloff;
+                }
+                /* Pack into the request @pack lock handles. */
+                ldlm_cli_cancel_list(cancels, pack, req, bufoff);
+                /* Prepare and send separate cancel rpc for others. */
+                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);
         } else {
                 ldlm_lock_list_put(cancels, l_bl_ast, count);
         }
         RETURN(req);
 }
 
+struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
+                                             int bufcount, int *size,
+                                             struct list_head *cancels,
+                                             int count)
+{
+        return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
+                                 bufcount, size, DLM_LOCKREQ_OFF,
+                                 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
+}
+
 /* If a request has some specific initialisation it is passed in @reqp,
  * otherwise it is created in ldlm_cli_enqueue.
  *
@@ -924,7 +942,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp,
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
                 RETURN(count);
 
-        free = ldlm_req_handles_avail(exp, size, 2, 0);
+        free = ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
         if (count > free)
                 count = free;
 
@@ -1066,13 +1084,16 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         LASSERT(list_empty(&lock->l_bl_ast));
         list_add(&lock->l_bl_ast, &cancels);
 
-        avail = ldlm_cancel_handles_avail(lock->l_conn_export);
-        LASSERT(avail > 0);
+        if (exp_connect_cancelset(lock->l_conn_export)) {
+                avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+                LASSERT(avail > 0);
 
-        ns = lock->l_resource->lr_namespace;
-        flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
-        count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
-                                       LDLM_FL_BL_AST, flags);
+                ns = lock->l_resource->lr_namespace;
+                flags = ns_connect_lru_resize(ns) ?
+                        LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
+                count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1,
+                                               LDLM_FL_BL_AST, flags);
+        }
         ldlm_cli_cancel_list(&cancels, count, NULL, 0);
         RETURN(0);
 }
@@ -1487,7 +1508,6 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                 count -= res;
                 ldlm_lock_list_put(cancels, l_bl_ast, res);
         }
-        LASSERT(list_empty(cancels));
         RETURN(0);
 }
 
index 37f3a29..0b39336 100644 (file)
@@ -87,6 +87,14 @@ int mdc_resource_get_unused(struct obd_export *exp, struct ll_fid *fid,
         RETURN(count);
 }
 
+struct ptlrpc_request *mdc_prep_elc_req(struct obd_export *exp,
+                                        int bufcount, int *size, int off,
+                                        struct list_head *cancels, int count)
+{
+        return ldlm_prep_elc_req(exp, LUSTRE_MDS_VERSION, MDS_REINT,
+                                 bufcount, size, off, 0, cancels, count);
+}
+
 /* If mdc_setattr is called with an 'iattr', then it is a normal RPC that
  * should take the normal semaphore and go to the normal portal.
  *
@@ -121,17 +129,10 @@ int mdc_setattr(struct obd_export *exp, struct mdc_op_data *op_data,
                 bits |= MDS_INODELOCK_LOOKUP;
         count = mdc_resource_get_unused(exp, &op_data->fid1,
                                         &cancels, LCK_EX, bits);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 5;
-                size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
-
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 3, &cancels, count);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -181,17 +182,10 @@ int mdc_create(struct obd_export *exp, struct mdc_op_data *op_data,
 
         count = mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
                                         LCK_EX, MDS_INODELOCK_UPDATE);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 5;
-                size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
-
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 3, &cancels, count);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -237,17 +231,10 @@ int mdc_unlink(struct obd_export *exp, struct mdc_op_data *op_data,
         if (op_data->fid3.id)
                 count += mdc_resource_get_unused(exp, &op_data->fid3, &cancels,
                                                  LCK_EX, MDS_INODELOCK_FULL);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 4;
-                size[REQ_REC_OFF + 2] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 2);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
-
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 2, &cancels, count);
         if (req == NULL)
                 RETURN(-ENOMEM);
         *request = req;
@@ -281,17 +268,10 @@ int mdc_link(struct obd_export *exp, struct mdc_op_data *op_data,
                                         LCK_EX, MDS_INODELOCK_UPDATE);
         count += mdc_resource_get_unused(exp, &op_data->fid2, &cancels,
                                          LCK_EX, MDS_INODELOCK_UPDATE);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 4;
-                size[REQ_REC_OFF + 2] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 2);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
-
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 2, &cancels, count);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -331,17 +311,10 @@ int mdc_rename(struct obd_export *exp, struct mdc_op_data *op_data,
         if (op_data->fid4.id)
                 count += mdc_resource_get_unused(exp, &op_data->fid4, &cancels,
                                                  LCK_EX, MDS_INODELOCK_FULL);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 5;
-                size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
-                              MDS_REINT, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
-
+        req = mdc_prep_elc_req(exp, bufcount, size,
+                               REQ_REC_OFF + 3, &cancels, count);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
index 097c006..2d428bc 100644 (file)
@@ -598,17 +598,10 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
 
         count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,
                                         LDLM_FL_DISCARD_DATA);
-        if (exp_connect_cancelset(exp) && count) {
+        if (exp_connect_cancelset(exp) && count)
                 bufcount = 3;
-                size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
-        }
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
-                              OST_DESTROY, bufcount, size, NULL);
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1);
-        else
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
-
+        req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,
+                                size, REQ_REC_OFF + 1, 0, &cancels, count);
         if (!req)
                 RETURN(-ENOMEM);