Whamcloud - gitweb
b=14149
[fs/lustre-release.git] / lustre / mdc / mdc_reint.c
index ee27032..9a81e2c 100644 (file)
@@ -43,7 +43,8 @@
 
 /* mdc_setattr does its own semaphore handling */
 static int mdc_reint(struct ptlrpc_request *request,
-                     struct mdc_rpc_lock *rpc_lock, int level)
+                     struct mdc_rpc_lock *rpc_lock,
+                     int level)
 {
         int rc;
 
@@ -54,10 +55,7 @@ static int mdc_reint(struct ptlrpc_request *request,
         mdc_put_rpc_lock(rpc_lock, NULL);
         if (rc)
                 CDEBUG(D_INFO, "error in handling %d\n", rc);
-        else if (!lustre_swab_repbuf(request, REPLY_REC_OFF,
-                                     sizeof(struct mdt_body),
-                                     lustre_swab_mdt_body)) {
-                CERROR ("Can't unpack mdt_body\n");
+        else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
                 rc = -EPROTO;
         }
         return rc;
@@ -90,12 +88,11 @@ int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
         RETURN(count);
 }
 
-struct ptlrpc_request *mdc_prep_elc_req(struct obd_export *exp,
-                                        int bufcount, int *size, int off,
-                                        struct list_head *cancels, int count)
+static int mdc_prep_elc_req(struct obd_export *exp, struct ptlrpc_request *req,
+                            struct list_head *cancels, int count)
 {
-        return ldlm_prep_elc_req(exp, LUSTRE_MDS_VERSION, MDS_REINT,
-                                 bufcount, size, off, 0, cancels, count);
+        return ldlm_prep_elc_req(exp, req, LUSTRE_MDS_VERSION, MDS_REINT,
+                                 0, cancels, count);
 }
 
 /* If mdc_setattr is called with an 'iattr', then it is a normal RPC that
@@ -110,29 +107,14 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
 {
         CFS_LIST_HEAD(cancels);
         struct ptlrpc_request *req;
-        struct mdt_rec_setattr *rec;
         struct mdc_rpc_lock *rpc_lock;
         struct obd_device *obd = exp->exp_obd;
-        int size[7] = { sizeof(struct ptlrpc_body),
-                        sizeof(*rec), 0, 0, ealen, ea2len, 0 };
-        int count = 0, bufcount = 4, rc;
+        int count = 0, rc;
         __u64 bits;
         ENTRY;
 
         LASSERT(op_data != NULL);
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                sizeof(struct lustre_capa) : 0;
-
-        if (op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN))
-                size[REQ_REC_OFF + 2] = sizeof(struct mdt_epoch);
-
-        if (ealen > 0) {
-                bufcount++;
-                if (ea2len > 0)
-                        bufcount++;
-        }
-
         bits = MDS_INODELOCK_UPDATE;
         if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
                 bits |= MDS_INODELOCK_LOOKUP;
@@ -140,13 +122,26 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                 &cancels, LCK_EX, bits);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 7;
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 5, &cancels, count);
-
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_REINT_SETATTR);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
+        }
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        if ((op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) == 0)
+                req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT,
+                                     0);
+        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
+        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
+                             ea2len);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
         if (op_data->op_attr.ia_valid & ATTR_FROM_OPEN) {
                 req->rq_request_portal = MDS_SETATTR_PORTAL; //XXX FIXME bug 249
@@ -159,11 +154,9 @@ int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
                 CDEBUG(D_INODE, "setting mtime %lu, ctime %lu\n",
                        LTIME_S(op_data->op_attr.ia_mtime),
                        LTIME_S(op_data->op_attr.ia_ctime));
-        mdc_setattr_pack(req, REQ_REC_OFF, op_data, ea, ealen, ea2, ea2len);
+        mdc_setattr_pack(req, op_data, ea, ealen, ea2, ea2len);
 
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        size[REPLY_REC_OFF + 1] = sizeof(struct lustre_capa);
-        ptlrpc_req_set_repsize(req, 3, size);
+        ptlrpc_request_set_replen(req);
         if (mod && (op_data->op_flags & MF_EPOCH_OPEN) &&
             req->rq_import->imp_replayable)
         {
@@ -201,12 +194,8 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                const void *data, int datalen, int mode, __u32 uid, __u32 gid,
                __u32 cap_effective, __u64 rdev, struct ptlrpc_request **request)
 {
-        int size[6] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_rec_create),
-                        0, op_data->op_namelen + 1, 0, 0 };
-        struct obd_device *obd = exp->exp_obd;
-        int level, bufcount = 4, rc;
         struct ptlrpc_request *req;
+        int level, rc;
         int count = 0;
         CFS_LIST_HEAD(cancels);
         ENTRY;
@@ -224,41 +213,43 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                 }
         }
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                sizeof(struct lustre_capa) : 0;
-        
-        if (data && datalen) {
-                size[bufcount] = datalen;
-                bufcount++;
-        }
-
         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && 
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                 &cancels, LCK_EX,
                                                 MDS_INODELOCK_UPDATE);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 6;
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 4, &cancels, count);
 
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_REINT_CREATE_RMT_ACL);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
+        }
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+        req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
+                             data && datalen ? datalen : 0);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
         /*
          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
          * tgt, for symlinks or lov MD data.
          */
-        mdc_create_pack(req, REQ_REC_OFF, op_data, data, datalen, mode, uid,
+        mdc_create_pack(req, op_data, data, datalen, mode, uid,
                         gid, cap_effective, rdev);
 
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        size[REPLY_REC_OFF + 1] = sizeof(struct lustre_capa);
-        ptlrpc_req_set_repsize(req, 3, size);
+        ptlrpc_request_set_replen(req);
 
         level = LUSTRE_IMP_FULL;
  resend:
-        rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, level);
+        rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
         
         /* Resend if we were told to. */
         if (rc == -ERESTARTSYS) {
@@ -268,16 +259,13 @@ int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
                 struct mdt_body *body;
                 struct lustre_capa *capa;
 
-                body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
-                                      sizeof(*body));
+                body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
                 LASSERT(body);
                 if (body->valid & OBD_MD_FLMDSCAPA) {
-                        capa = lustre_unpack_capa(req->rq_repmsg,
-                                                  REPLY_REC_OFF + 1);
-                        if (capa == NULL) {
-                                CERROR("Missing/short MDS capability\n");
+                        capa = req_capsule_server_get(&req->rq_pill,
+                                                      &RMF_CAPA1);
+                        if (capa == NULL)
                                 rc = -EPROTO;
-                        }
                 }
         }
 
@@ -291,17 +279,11 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
         CFS_LIST_HEAD(cancels);
         struct obd_device *obd = class_exp2obd(exp);
         struct ptlrpc_request *req = *request;
-        int size[5] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_rec_unlink),
-                        0, op_data->op_namelen + 1, 0 };
-        int count = 0, rc, bufcount = 4;
+        int count = 0, rc;
         ENTRY;
 
         LASSERT(req == NULL);
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                sizeof(struct lustre_capa) : 0;
-
         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) && 
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
@@ -312,22 +294,32 @@ int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_FULL);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 5;
-        
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 3, &cancels, count);
-
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_REINT_UNLINK);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
-        *request = req;
+        }
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
+
+        mdc_unlink_pack(req, op_data);
 
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        size[REPLY_REC_OFF + 1] = obd->u.cli.cl_max_mds_easize;
-        size[REPLY_REC_OFF + 2] = obd->u.cli.cl_max_mds_cookiesize;
-        ptlrpc_req_set_repsize(req, 4, size);
+        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                             obd->u.cli.cl_max_mds_easize);
+        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                             obd->u.cli.cl_max_mds_cookiesize);
+        ptlrpc_request_set_replen(req);
 
-        mdc_unlink_pack(req, REQ_REC_OFF, op_data);
+        *request = req;
 
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
         if (rc == -ERESTARTSYS)
@@ -341,17 +333,9 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
         CFS_LIST_HEAD(cancels);
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
-        int size[6] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_rec_link),
-                        0, 0, op_data->op_namelen + 1, 0 };
-        int count = 0, rc, bufcount = 5;
+        int count = 0, rc;
         ENTRY;
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                sizeof(struct lustre_capa) : 0;
-        size[REQ_REC_OFF + 2] = op_data->op_capa2 ?
-                sizeof(struct lustre_capa) : 0;
-
         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
             (fid_is_sane(&op_data->op_fid2)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
@@ -362,18 +346,26 @@ int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_UPDATE);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 6;
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 4, &cancels, count);
 
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
+        }
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
+                             op_data->op_namelen + 1);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
 
-        mdc_link_pack(req, REQ_REC_OFF, op_data);
-
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        ptlrpc_req_set_repsize(req, 2, size);
+        mdc_link_pack(req, op_data);
+        ptlrpc_request_set_replen(req);
 
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
         *request = req;
@@ -390,17 +382,9 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
         CFS_LIST_HEAD(cancels);
         struct obd_device *obd = exp->exp_obd;
         struct ptlrpc_request *req;
-        int size[7] = { sizeof(struct ptlrpc_body),
-                        sizeof(struct mdt_rec_rename),
-                        0, 0, oldlen + 1, newlen + 1, 0 };
-        int count = 0, rc, bufcount = 6;
+        int count = 0, rc;
         ENTRY;
 
-        size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
-                                        sizeof(struct lustre_capa) : 0;
-        size[REQ_REC_OFF + 2] = op_data->op_capa2 ?
-                                        sizeof(struct lustre_capa) : 0;
-
         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
             (fid_is_sane(&op_data->op_fid1)))
                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
@@ -421,20 +405,36 @@ int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
                 count += mdc_resource_get_unused(exp, &op_data->op_fid4,
                                                  &cancels, LCK_EX,
                                                  MDS_INODELOCK_FULL);
-        if (exp_connect_cancelset(exp) && count)
-                bufcount = 7;
-        req = mdc_prep_elc_req(exp, bufcount, size,
-                               REQ_REC_OFF + 5, &cancels, count);
 
-        if (req == NULL)
+        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                   &RQF_MDS_REINT_RENAME);
+        if (req == NULL) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
                 RETURN(-ENOMEM);
+        }
+
+        mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+        mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
+        req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
+        req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
+
+        rc = mdc_prep_elc_req(exp, req, &cancels, count);
+        if (rc) {
+                ldlm_lock_list_put(&cancels, l_bl_ast, count);
+                ptlrpc_request_free(req);
+                RETURN(rc);
+        }
+
+        if (exp_connect_cancelset(exp) && req)
+                ldlm_cli_cancel_list(&cancels, count, req, 0);
 
-        mdc_rename_pack(req, REQ_REC_OFF, op_data, old, oldlen, new, newlen);
+        mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
 
-        size[REPLY_REC_OFF] = sizeof(struct mdt_body);
-        size[REPLY_REC_OFF + 1] = obd->u.cli.cl_max_mds_easize;
-        size[REPLY_REC_OFF + 2] = obd->u.cli.cl_max_mds_cookiesize;
-        ptlrpc_req_set_repsize(req, 4, size);
+        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                             obd->u.cli.cl_max_mds_easize);
+        req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
+                             obd->u.cli.cl_max_mds_cookiesize);
+        ptlrpc_request_set_replen(req);
 
         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
         *request = req;