Whamcloud - gitweb
Branch HEAD
authorvitaly <vitaly>
Thu, 21 Feb 2008 13:25:21 +0000 (13:25 +0000)
committervitaly <vitaly>
Thu, 21 Feb 2008 13:25:21 +0000 (13:25 +0000)
b=14149
i=shadow
i=huanghua

fixes for ELC bugs appeared after moving to capsule

lustre/include/lustre_net.h
lustre/include/lustre_req_layout.h
lustre/ldlm/ldlm_request.c
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_reint.c
lustre/osc/osc_request.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/pack_generic.c

index 1a9768d..748fe49 100644 (file)
@@ -883,6 +883,7 @@ int lustre_pack_reply_v2(struct ptlrpc_request *req, int count,
 int lustre_shrink_msg(struct lustre_msg *msg, int segment,
                       unsigned int newlen, int move_data);
 void lustre_free_reply_state(struct ptlrpc_reply_state *rs);
+int lustre_msg_hdr_size(__u32 magic, int count);
 int lustre_msg_size(__u32 magic, int count, int *lengths);
 int lustre_msg_size_v2(int count, int *lengths);
 int lustre_packed_msg_size(struct lustre_msg *msg);
index eb5cde5..4c88a6b 100644 (file)
@@ -91,6 +91,9 @@ void req_capsule_set_size(struct req_capsule *pill,
 int req_capsule_get_size(const struct req_capsule *pill,
                           const struct req_msg_field *field,
                           enum req_location loc);
+int req_capsule_msg_size(const struct req_capsule *pill, enum req_location loc);
+int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
+                         enum req_location loc);
 void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt);
 
 int req_capsule_has_field(const struct req_capsule *pill,
index 9ccf69c..fae7632 100644 (file)
@@ -486,28 +486,31 @@ cleanup:
 /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
  * a single page on the send/receive side. XXX: 512 should be changed
  * to more adequate value. */
-static inline int ldlm_req_handles_avail(struct obd_export *exp,
-                                         int *size, int bufcount,
-                                         int bufoff, int off)
+static inline int ldlm_req_handles_avail(int req_size, int off)
 {
-        int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
-        int old_size = size[bufoff];
+        int avail;
 
-        size[bufoff] = sizeof(struct ldlm_request);
-        avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
-                                 bufcount, size);
+        avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512) - req_size;
         avail /= sizeof(struct lustre_handle);
         avail += LDLM_LOCKREQ_HANDLES - off;
-        size[bufoff] = old_size;
 
         return avail;
 }
 
-static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
+static inline int ldlm_capsule_handles_avail(struct req_capsule *pill,
+                                             enum req_location loc,
+                                             int off)
 {
-        int size[2] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct ldlm_request) };
-        return ldlm_req_handles_avail(exp, size, 2, DLM_LOCKREQ_OFF, 0);
+        int size = req_capsule_msg_size(pill, loc);
+        return ldlm_req_handles_avail(size, off);
+}
+
+static inline int ldlm_format_handles_avail(struct obd_import *imp,
+                                            const struct req_format *fmt,
+                                            enum req_location loc, int off)
+{
+        int size = req_capsule_fmt_size(imp->imp_msg_magic, fmt, loc);
+        return ldlm_req_handles_avail(size, off);
 }
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
@@ -520,17 +523,17 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
         struct req_capsule      *pill = &req->rq_pill;
         struct ldlm_request     *dlm = NULL;
         int flags, avail, to_free, bufcount, pack = 0;
+        CFS_LIST_HEAD(head);
         int rc;
         ENTRY;
 
-
-        LASSERT(cancels != NULL);
-
+        if (cancels == NULL)
+                cancels = &head;
         if (exp_connect_cancelset(exp)) {
                 /* Estimate the amount of available space in the request. */
                 bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
-                avail = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
-                                               bufcount, bufcount - 1, canceloff);
+                avail = ldlm_capsule_handles_avail(pill, RCL_CLIENT, canceloff);
+
                 flags = ns_connect_lru_resize(ns) ? 
                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;
                 to_free = !ns_connect_lru_resize(ns) &&
@@ -546,8 +549,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
                         pack = count;
                 else
                         pack = avail;
-                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
-                                     ldlm_request_bufsize(count, opc));
+                req_capsule_set_size(pill, &RMF_DLM_REQ, RCL_CLIENT,
+                                     ldlm_request_bufsize(pack, opc));
         }
 
         rc = ptlrpc_request_pack(req, version, opc);
@@ -576,10 +579,8 @@ int ldlm_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
         RETURN(0);
 }
 
-int ldlm_prep_enqueue_req(struct obd_export *exp,
-                          struct ptlrpc_request *req,
-                          struct list_head *cancels,
-                          int count)
+int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
+                          struct list_head *cancels, int count)
 {
         return ldlm_prep_elc_req(exp, req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
                                  LDLM_ENQUEUE_CANCEL_OFF, cancels, count);
@@ -923,9 +924,14 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))
                 RETURN(count);
 
+        free = ldlm_format_handles_avail(class_exp2cliimp(exp),
+                                         &RQF_LDLM_CANCEL, RCL_CLIENT, 0);
+        if (count > free)
+                count = free;
+
         while (1) {
                 int bufcount;
-                struct req_capsule *pill; 
+
                 imp = class_exp2cliimp(exp);
                 if (imp == NULL || imp->imp_invalid) {
                         CDEBUG(D_DLMTRACE,
@@ -937,14 +943,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                 if (req == NULL)
                         GOTO(out, rc = -ENOMEM);
 
-                pill = &req->rq_pill;
-                bufcount = req_capsule_filled_sizes(pill, RCL_CLIENT);
-
-                free = ldlm_req_handles_avail(exp, pill->rc_area[RCL_CLIENT],
-                                              bufcount, bufcount, 0);
-                if (count > free)
-                        count = free;
-
+                bufcount = req_capsule_filled_sizes(&req->rq_pill, RCL_CLIENT);
                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
                                      ldlm_request_bufsize(count, LDLM_CANCEL));
 
@@ -1057,6 +1056,7 @@ EXPORT_SYMBOL(ldlm_cli_update_pool);
 
 int ldlm_cli_cancel(struct lustre_handle *lockh)
 {
+        struct obd_export *exp;
         int avail, flags, count = 1, rc = 0;
         struct ldlm_namespace *ns;
         struct ldlm_lock *lock;
@@ -1080,8 +1080,12 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
          * here and send them all as one LDLM_CANCEL rpc. */
         LASSERT(list_empty(&lock->l_bl_ast));
         list_add(&lock->l_bl_ast, &cancels);
-        if (exp_connect_cancelset(lock->l_conn_export)) {
-                avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+
+        exp = lock->l_conn_export;
+        if (exp_connect_cancelset(exp)) {
+                avail = ldlm_format_handles_avail(class_exp2cliimp(exp),
+                                                  &RQF_LDLM_CANCEL,
+                                                  RCL_CLIENT, 0);
                 LASSERT(avail > 0);
 
                 ns = lock->l_resource->lr_namespace;
index 343b918..932564d 100644 (file)
@@ -307,19 +307,11 @@ static struct ptlrpc_request *mdc_intent_open_pack(struct obd_export *exp,
                                      RCL_CLIENT, 0);
         }
 
-        if (exp_connect_cancelset(exp) && count) {
-                req_capsule_set_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT,
-                                     ldlm_request_bufsize(count, LDLM_ENQUEUE));
-        }
-
-        rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
         if (rc) {
                 ptlrpc_request_free(req);
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 return NULL;
         }
-        if (exp_connect_cancelset(exp) && req)
-                ldlm_cli_cancel_list(&cancels, count, req, 0);
 
         if (joinfile) {
                 __u64 head_size = *(__u64 *)op_data->op_data;
@@ -365,7 +357,7 @@ static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
-        rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
         if (rc) {
                 ptlrpc_request_free(req);
                 RETURN(ERR_PTR(rc));
@@ -410,7 +402,7 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
                              op_data->op_namelen + 1);
 
-        rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_ENQUEUE);
+        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
         if (rc) {
                 ptlrpc_request_free(req);
                 RETURN(ERR_PTR(rc));
@@ -435,14 +427,19 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
 static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp)
 {
         struct ptlrpc_request *req;
+        int rc;
         ENTRY;
 
-        req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
-                                        &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION,
-                                        LDLM_ENQUEUE);
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
         if (req == NULL)
                 RETURN(ERR_PTR(-ENOMEM));
 
+        rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+        if (rc) {
+                ptlrpc_request_free(req);
+                RETURN(ERR_PTR(rc));
+        }
+
         ptlrpc_request_set_replen(req);
         RETURN(req);
 }
index 9a81e2c..8a49a72 100644 (file)
@@ -138,7 +138,6 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
 
         rc = mdc_prep_elc_req(exp, req, &cancels, count);
         if (rc) {
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
@@ -233,7 +232,6 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
 
         rc = mdc_prep_elc_req(exp, req, &cancels, count);
         if (rc) {
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
@@ -306,7 +304,6 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
 
         rc = mdc_prep_elc_req(exp, req, &cancels, count);
         if (rc) {
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
@@ -359,7 +356,6 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
 
         rc = mdc_prep_elc_req(exp, req, &cancels, count);
         if (rc) {
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
@@ -420,7 +416,6 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
 
         rc = mdc_prep_elc_req(exp, req, &cancels, count);
         if (rc) {
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
index 1f1c42b..f89cb19 100644 (file)
@@ -694,7 +694,6 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY, 
                                0, &cancels, count);
         if (rc) {
-                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 ptlrpc_request_free(req);
                 RETURN(rc);
         }
index cfdf171..89ffe05 100644 (file)
@@ -1435,6 +1435,31 @@ int req_capsule_get_size(const struct req_capsule *pill,
 }
 EXPORT_SYMBOL(req_capsule_get_size);
 
+int req_capsule_msg_size(const struct req_capsule *pill, enum req_location loc)
+{
+        return lustre_msg_size(pill->rc_req->rq_import->imp_msg_magic,
+                               pill->rc_fmt->rf_fields[loc].nr,
+                               (int *)pill->rc_area[loc]);
+}
+
+int req_capsule_fmt_size(__u32 magic, const struct req_format *fmt,
+                         enum req_location loc)
+{
+        int size, i = 0;
+
+        size = lustre_msg_hdr_size(magic, fmt->rf_fields[loc].nr);
+        if (size < 0)
+                return size;
+
+        if (magic == LUSTRE_MSG_MAGIC_V1)
+                i++;
+
+        for (; i < fmt->rf_fields[loc].nr; ++i)
+                if (fmt->rf_fields[loc].d[i]->rmf_size != -1)
+                        size += size_round(fmt->rf_fields[loc].d[i]->rmf_size);
+        return size;
+}
+
 #define FMT_FIELD(fmt, i, j) (fmt)->rf_fields[(i)].d[(j)]
 
 void req_capsule_extend(struct req_capsule *pill, const struct req_format *fmt)
index e49eaa5..d108815 100644 (file)
@@ -55,6 +55,20 @@ static inline int lustre_msg_hdr_size_v2(int count)
         return size_round(offsetof(struct lustre_msg_v2, lm_buflens[count]));
 }
 
+int lustre_msg_hdr_size(__u32 magic, int count)
+{
+        switch (magic) {
+        case LUSTRE_MSG_MAGIC_V1:
+                return lustre_msg_hdr_size_v1(count - 1);
+        case LUSTRE_MSG_MAGIC_V2:
+                return lustre_msg_hdr_size_v2(count);
+        default:
+                LASSERTF(0, "incorrect message magic: %08x\n", magic);
+                return -EINVAL;
+        }
+}
+EXPORT_SYMBOL(lustre_msg_hdr_size);
+
 int lustre_msg_swabbed(struct lustre_msg *msg)
 {
         return (msg->lm_magic == LUSTRE_MSG_MAGIC_V1_SWABBED) ||