Whamcloud - gitweb
LU-1330 obdclass: splits server-side object stack from client
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index df9376b..9f3dea4 100644 (file)
@@ -48,7 +48,6 @@
 #include <lustre_acl.h>
 #include <obd_class.h>
 #include <lustre_fid.h>
-#include <md_object.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <lustre_log.h>
@@ -87,6 +86,24 @@ int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
         }
 }
 
+static inline int mdc_queue_wait(struct ptlrpc_request *req)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       int rc;
+
+       /* mdc_enter_request() ensures that this client has no more
+        * than cl_max_rpcs_in_flight RPCs simultaneously inf light
+        * against an MDT. */
+       rc = mdc_enter_request(cli);
+       if (rc != 0)
+               return rc;
+
+       rc = ptlrpc_queue_wait(req);
+       mdc_exit_request(cli);
+
+       return rc;
+}
+
 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
 static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
@@ -992,9 +1009,9 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
         req->rq_request_portal = MDS_READPAGE_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
-        desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
-        if (desc == NULL)
-                GOTO(out, rc = -ENOMEM);
+       desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL);
+       if (desc == NULL)
+               GOTO(out, rc = -ENOMEM);
 
         /* NB req now owns desc and will free it when it gets freed. */
         ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset);
@@ -1044,8 +1061,8 @@ restart_bulk:
         req->rq_request_portal = MDS_READPAGE_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
-        desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, BULK_PUT_SINK,
-                                    MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK,
+                                   MDS_BULK_PORTAL);
         if (desc == NULL) {
                 ptlrpc_request_free(req);
                 RETURN(-ENOMEM);
@@ -1180,9 +1197,9 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
         /* Val is struct getinfo_fid2path result plus path */
         vallen = sizeof(*gf) + gf->gf_pathlen;
 
-        rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
-        if (rc)
-                GOTO(out, rc);
+       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
+       if (rc != 0 && rc != -EREMOTE)
+               GOTO(out, rc);
 
         if (vallen <= sizeof(*gf))
                 GOTO(out, rc = -EPROTO);
@@ -1211,14 +1228,18 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        /* Copy hsm_progress struct */
        req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
-       LASSERT(req_hpk);
+       if (req_hpk == NULL)
+               GOTO(out, rc = -EPROTO);
+
        *req_hpk = *hpk;
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1238,15 +1259,19 @@ static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        /* Copy hsm_progress struct */
        archive_mask = req_capsule_client_get(&req->rq_pill,
                                              &RMF_MDS_HSM_ARCHIVE);
-       LASSERT(archive_mask);
+       if (archive_mask == NULL)
+               GOTO(out, rc = -EPROTO);
+
        *archive_mask = archives;
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1280,7 +1305,7 @@ static int mdc_ioc_hsm_current_action(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        if (rc)
                GOTO(out, rc);
 
@@ -1309,9 +1334,11 @@ static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1345,7 +1372,7 @@ static int mdc_ioc_hsm_state_get(struct obd_export *exp,
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        if (rc)
                GOTO(out, rc);
 
@@ -1388,12 +1415,13 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp,
 
        /* Copy states */
        req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
-       LASSERT(req_hss);
+       if (req_hss == NULL)
+               GOTO(out, rc = -EPROTO);
        *req_hss = *hss;
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 
        EXIT;
@@ -1433,23 +1461,26 @@ static int mdc_ioc_hsm_request(struct obd_export *exp,
 
        /* Copy hsm_request struct */
        req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
-       LASSERT(req_hr);
+       if (req_hr == NULL)
+               GOTO(out, rc = -EPROTO);
        *req_hr = hur->hur_request;
 
        /* Copy hsm_user_item structs */
        req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
-       LASSERT(req_hui);
+       if (req_hui == NULL)
+               GOTO(out, rc = -EPROTO);
        memcpy(req_hui, hur->hur_user_item,
               hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
 
        /* Copy opaque field */
        req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
-       LASSERT(req_opaque);
+       if (req_opaque == NULL)
+               GOTO(out, rc = -EPROTO);
        memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 
 out:
@@ -1474,11 +1505,11 @@ static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
 #define D_CHANGELOG 0
 
 struct changelog_show {
-        __u64       cs_startrec;
-        __u32       cs_flags;
-        cfs_file_t *cs_fp;
-        char       *cs_buf;
-        struct obd_device *cs_obd;
+       __u64           cs_startrec;
+       __u32           cs_flags;
+       struct file     *cs_fp;
+       char            *cs_buf;
+       struct obd_device *cs_obd;
 };
 
 static int changelog_show_cb(const struct lu_env *env, struct llog_handle *llh,
@@ -1572,8 +1603,8 @@ static int mdc_changelog_send_thread(void *csdata)
         }
 
 out:
-        cfs_put_file(cs->cs_fp);
-        if (llh)
+       fput(cs->cs_fp);
+       if (llh)
                llog_cat_close(NULL, llh);
         if (ctxt)
                 llog_ctxt_put(ctxt);
@@ -1596,11 +1627,11 @@ static int mdc_ioc_changelog_send(struct obd_device *obd,
         if (!cs)
                 return -ENOMEM;
 
-        cs->cs_obd = obd;
-        cs->cs_startrec = icc->icc_recno;
-        /* matching cfs_put_file in mdc_changelog_send_thread */
-        cs->cs_fp = cfs_get_fd(icc->icc_id);
-        cs->cs_flags = icc->icc_flags;
+       cs->cs_obd = obd;
+       cs->cs_startrec = icc->icc_recno;
+       /* matching fput in mdc_changelog_send_thread */
+       cs->cs_fp = fget(icc->icc_id);
+       cs->cs_flags = icc->icc_flags;
 
         /* New thread because we should return to user app before
            writing into our pipe */
@@ -1942,19 +1973,20 @@ int mdc_get_info_rpc(struct obd_export *exp,
                              RCL_SERVER, vallen);
         ptlrpc_request_set_replen(req);
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc == 0) {
-                tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
-                memcpy(val, tmp, vallen);
-                if (ptlrpc_rep_need_swab(req)) {
-                        if (KEY_IS(KEY_FID2PATH)) {
-                                lustre_swab_fid2path(val);
-                        }
-                }
-        }
-        ptlrpc_req_finished(req);
+       rc = ptlrpc_queue_wait(req);
+       /* -EREMOTE means the get_info result is partial, and it needs to
+        * continue on another MDT, see fid2path part in lmv_iocontrol */
+       if (rc == 0 || rc == -EREMOTE) {
+               tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
+               memcpy(val, tmp, vallen);
+               if (ptlrpc_rep_need_swab(req)) {
+                       if (KEY_IS(KEY_FID2PATH))
+                               lustre_swab_fid2path(val);
+               }
+       }
+       ptlrpc_req_finished(req);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static void lustre_swab_hai(struct hsm_action_item *h)
@@ -2014,11 +2046,12 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
                if (rc == 0)
                        rc = mdc_ioc_hsm_ct_unregister(imp);
        } else {
-               cfs_file_t *fp = cfs_get_fd(lk->lk_wfd);
+               struct file *fp = fget(lk->lk_wfd);
+
                rc = libcfs_kkuc_group_add(fp, lk->lk_uid, lk->lk_group,
                                           lk->lk_data);
                if (rc && fp)
-                       cfs_put_file(fp);
+                       fput(fp);
                if (rc == 0)
                        rc = mdc_ioc_hsm_ct_register(imp, archive);
        }
@@ -2395,6 +2428,18 @@ static int mdc_cancel_for_recovery(struct ldlm_lock *lock)
         RETURN(1);
 }
 
+static int mdc_resource_inode_free(struct ldlm_resource *res)
+{
+       if (res->lr_lvb_inode)
+               res->lr_lvb_inode = NULL;
+
+       return 0;
+}
+
+struct ldlm_valblock_ops inode_lvbo = {
+       lvbo_free: mdc_resource_inode_free
+};
+
 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -2424,6 +2469,8 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 
         ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery);
 
+       obd->obd_namespace->ns_lvbo = &inode_lvbo;
+
         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
         if (rc) {
                 mdc_cleanup(obd);
@@ -2699,7 +2746,7 @@ struct obd_ops mdc_obd_ops = {
 
 struct md_ops mdc_md_ops = {
         .m_getstatus        = mdc_getstatus,
-        .m_change_cbdata    = mdc_change_cbdata,
+        .m_null_inode      = mdc_null_inode,
         .m_find_cbdata      = mdc_find_cbdata,
         .m_close            = mdc_close,
         .m_create           = mdc_create,