Whamcloud - gitweb
LU-1330 obdclass: splits server-side object stack from client
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index a76f279..9f3dea4 100644 (file)
@@ -48,7 +48,6 @@
 #include <lustre_acl.h>
 #include <obd_class.h>
 #include <lustre_fid.h>
-#include <md_object.h>
 #include <lprocfs_status.h>
 #include <lustre_param.h>
 #include <lustre_log.h>
@@ -87,6 +86,24 @@ int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req,
         }
 }
 
+static inline int mdc_queue_wait(struct ptlrpc_request *req)
+{
+       struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+       int rc;
+
+       /* mdc_enter_request() ensures that this client has no more
+        * than cl_max_rpcs_in_flight RPCs simultaneously inf light
+        * against an MDT. */
+       rc = mdc_enter_request(cli);
+       if (rc != 0)
+               return rc;
+
+       rc = ptlrpc_queue_wait(req);
+       mdc_exit_request(cli);
+
+       return rc;
+}
+
 /* Helper that implements most of mdc_getstatus and signal_completed_replay. */
 /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */
 static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid,
@@ -992,9 +1009,9 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
         req->rq_request_portal = MDS_READPAGE_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
-        desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL);
-        if (desc == NULL)
-                GOTO(out, rc = -ENOMEM);
+       desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL);
+       if (desc == NULL)
+               GOTO(out, rc = -ENOMEM);
 
         /* NB req now owns desc and will free it when it gets freed. */
         ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset);
@@ -1044,8 +1061,8 @@ restart_bulk:
         req->rq_request_portal = MDS_READPAGE_PORTAL;
         ptlrpc_at_set_req_timeout(req);
 
-        desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, BULK_PUT_SINK,
-                                    MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK,
+                                   MDS_BULK_PORTAL);
         if (desc == NULL) {
                 ptlrpc_request_free(req);
                 RETURN(-ENOMEM);
@@ -1180,9 +1197,9 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf)
         /* Val is struct getinfo_fid2path result plus path */
         vallen = sizeof(*gf) + gf->gf_pathlen;
 
-        rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
-        if (rc)
-                GOTO(out, rc);
+       rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL);
+       if (rc != 0 && rc != -EREMOTE)
+               GOTO(out, rc);
 
         if (vallen <= sizeof(*gf))
                 GOTO(out, rc = -EPROTO);
@@ -1211,14 +1228,18 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp,
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        /* Copy hsm_progress struct */
        req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS);
-       LASSERT(req_hpk);
+       if (req_hpk == NULL)
+               GOTO(out, rc = -EPROTO);
+
        *req_hpk = *hpk;
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
@@ -1238,21 +1259,69 @@ static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives)
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        /* Copy hsm_progress struct */
        archive_mask = req_capsule_client_get(&req->rq_pill,
                                              &RMF_MDS_HSM_ARCHIVE);
-       LASSERT(archive_mask);
+       if (archive_mask == NULL)
+               GOTO(out, rc = -EPROTO);
+
        *archive_mask = archives;
 
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
 out:
        ptlrpc_req_finished(req);
        return rc;
 }
 
+static int mdc_ioc_hsm_current_action(struct obd_export *exp,
+                                     struct md_op_data *op_data)
+{
+       struct hsm_current_action       *hca = op_data->op_data;
+       struct hsm_current_action       *req_hca;
+       struct ptlrpc_request           *req;
+       int                              rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_MDS_HSM_ACTION);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
+                     OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
+
+       ptlrpc_request_set_replen(req);
+
+       rc = mdc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
+
+       req_hca = req_capsule_server_get(&req->rq_pill,
+                                        &RMF_MDS_HSM_CURRENT_ACTION);
+       if (req_hca == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       *hca = *req_hca;
+
+       EXIT;
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
 static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
 {
        struct ptlrpc_request   *req;
@@ -1265,10 +1334,155 @@ static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp)
        if (req == NULL)
                GOTO(out, rc = -ENOMEM);
 
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
        ptlrpc_request_set_replen(req);
 
-       rc = ptlrpc_queue_wait(req);
+       rc = mdc_queue_wait(req);
+       GOTO(out, rc);
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
+static int mdc_ioc_hsm_state_get(struct obd_export *exp,
+                                struct md_op_data *op_data)
+{
+       struct hsm_user_state   *hus = op_data->op_data;
+       struct hsm_user_state   *req_hus;
+       struct ptlrpc_request   *req;
+       int                      rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_MDS_HSM_STATE_GET);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET);
+       if (rc != 0) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
+                     OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
+
+       ptlrpc_request_set_replen(req);
+
+       rc = mdc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
+
+       req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE);
+       if (req_hus == NULL)
+               GOTO(out, rc = -EPROTO);
+
+       *hus = *req_hus;
+
+       EXIT;
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
+static int mdc_ioc_hsm_state_set(struct obd_export *exp,
+                                struct md_op_data *op_data)
+{
+       struct hsm_state_set    *hss = op_data->op_data;
+       struct hsm_state_set    *req_hss;
+       struct ptlrpc_request   *req;
+       int                      rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_MDS_HSM_STATE_SET);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1,
+                     OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0);
+
+       /* Copy states */
+       req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET);
+       if (req_hss == NULL)
+               GOTO(out, rc = -EPROTO);
+       *req_hss = *hss;
+
+       ptlrpc_request_set_replen(req);
+
+       rc = mdc_queue_wait(req);
        GOTO(out, rc);
+
+       EXIT;
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
+static int mdc_ioc_hsm_request(struct obd_export *exp,
+                              struct hsm_user_request *hur)
+{
+       struct obd_import       *imp = class_exp2cliimp(exp);
+       struct ptlrpc_request   *req;
+       struct hsm_request      *req_hr;
+       struct hsm_user_item    *req_hui;
+       char                    *req_opaque;
+       int                      rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST);
+       if (req == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT,
+                            hur->hur_request.hr_itemcount
+                            * sizeof(struct hsm_user_item));
+       req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT,
+                            hur->hur_request.hr_data_len);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0);
+
+       /* Copy hsm_request struct */
+       req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST);
+       if (req_hr == NULL)
+               GOTO(out, rc = -EPROTO);
+       *req_hr = hur->hur_request;
+
+       /* Copy hsm_user_item structs */
+       req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM);
+       if (req_hui == NULL)
+               GOTO(out, rc = -EPROTO);
+       memcpy(req_hui, hur->hur_user_item,
+              hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item));
+
+       /* Copy opaque field */
+       req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA);
+       if (req_opaque == NULL)
+               GOTO(out, rc = -EPROTO);
+       memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len);
+
+       ptlrpc_request_set_replen(req);
+
+       rc = mdc_queue_wait(req);
+       GOTO(out, rc);
+
 out:
        ptlrpc_req_finished(req);
        return rc;
@@ -1291,11 +1505,11 @@ static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags)
 #define D_CHANGELOG 0
 
 struct changelog_show {
-        __u64       cs_startrec;
-        __u32       cs_flags;
-        cfs_file_t *cs_fp;
-        char       *cs_buf;
-        struct obd_device *cs_obd;
+       __u64           cs_startrec;
+       __u32           cs_flags;
+       struct file     *cs_fp;
+       char            *cs_buf;
+       struct obd_device *cs_obd;
 };
 
 static int changelog_show_cb(const struct lu_env *env, struct llog_handle *llh,
@@ -1389,8 +1603,8 @@ static int mdc_changelog_send_thread(void *csdata)
         }
 
 out:
-        cfs_put_file(cs->cs_fp);
-        if (llh)
+       fput(cs->cs_fp);
+       if (llh)
                llog_cat_close(NULL, llh);
         if (ctxt)
                 llog_ctxt_put(ctxt);
@@ -1413,11 +1627,11 @@ static int mdc_ioc_changelog_send(struct obd_device *obd,
         if (!cs)
                 return -ENOMEM;
 
-        cs->cs_obd = obd;
-        cs->cs_startrec = icc->icc_recno;
-        /* matching cfs_put_file in mdc_changelog_send_thread */
-        cs->cs_fp = cfs_get_fd(icc->icc_id);
-        cs->cs_flags = icc->icc_flags;
+       cs->cs_obd = obd;
+       cs->cs_startrec = icc->icc_recno;
+       /* matching fput in mdc_changelog_send_thread */
+       cs->cs_fp = fget(icc->icc_id);
+       cs->cs_flags = icc->icc_flags;
 
         /* New thread because we should return to user app before
            writing into our pipe */
@@ -1519,6 +1733,63 @@ static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp,
         RETURN(rc);
 }
 
+static int mdc_ioc_swap_layouts(struct obd_export *exp,
+                               struct md_op_data *op_data)
+{
+       CFS_LIST_HEAD(cancels);
+       struct ptlrpc_request   *req;
+       int                      rc, count;
+       struct mdc_swap_layouts *msl, *payload;
+       ENTRY;
+
+       msl = op_data->op_data;
+
+       /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the
+        * first thing it will do is to cancel the 2 layout
+        * locks hold by this client.
+        * So the client must cancel its layout locks on the 2 fids
+        * with the request RPC to avoid extra RPC round trips
+        */
+       count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels,
+                                       LCK_CR, MDS_INODELOCK_LAYOUT);
+       count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels,
+                                        LCK_CR, MDS_INODELOCK_LAYOUT);
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp),
+                                  &RQF_MDS_SWAP_LAYOUTS);
+       if (req == NULL) {
+               ldlm_lock_list_put(&cancels, l_bl_ast, count);
+               RETURN(-ENOMEM);
+       }
+
+       mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+       mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
+
+       rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(rc);
+       }
+
+       mdc_swap_layouts_pack(req, op_data);
+
+       payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS);
+       LASSERT(payload);
+
+       *payload = *msl;
+
+       ptlrpc_request_set_replen(req);
+
+       rc = ptlrpc_queue_wait(req);
+       if (rc)
+               GOTO(out, rc);
+       EXIT;
+
+out:
+       ptlrpc_req_finished(req);
+       return rc;
+}
+
 static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                          void *karg, void *uarg)
 {
@@ -1555,6 +1826,17 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
        case LL_IOC_HSM_PROGRESS:
                rc = mdc_ioc_hsm_progress(exp, karg);
                GOTO(out, rc);
+       case LL_IOC_HSM_STATE_GET:
+               rc = mdc_ioc_hsm_state_get(exp, karg);
+               GOTO(out, rc);
+       case LL_IOC_HSM_STATE_SET:
+               rc = mdc_ioc_hsm_state_set(exp, karg);
+       case LL_IOC_HSM_ACTION:
+               rc = mdc_ioc_hsm_current_action(exp, karg);
+               GOTO(out, rc);
+       case LL_IOC_HSM_REQUEST:
+               rc = mdc_ioc_hsm_request(exp, karg);
+               GOTO(out, rc);
         case OBD_IOC_CLIENT_RECOVER:
                 rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0);
                 if (rc < 0)
@@ -1635,13 +1917,18 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 OBD_FREE_PTR(oqctl);
                 break;
         }
-        case LL_IOC_GET_CONNECT_FLAGS: {
-                if (cfs_copy_to_user(uarg, &exp->exp_connect_flags,
-                                     sizeof(__u64)))
-                        GOTO(out, rc = -EFAULT);
-                else
-                        GOTO(out, rc = 0);
-        }
+       case LL_IOC_GET_CONNECT_FLAGS: {
+               if (cfs_copy_to_user(uarg,
+                                    exp_connect_flags_ptr(exp),
+                                    sizeof(__u64)))
+                       GOTO(out, rc = -EFAULT);
+               else
+                       GOTO(out, rc = 0);
+       }
+       case LL_IOC_LOV_SWAP_LAYOUTS: {
+               rc = mdc_ioc_swap_layouts(exp, karg);
+               break;
+       }
         default:
                 CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd);
                 GOTO(out, rc = -ENOTTY);
@@ -1686,19 +1973,20 @@ int mdc_get_info_rpc(struct obd_export *exp,
                              RCL_SERVER, vallen);
         ptlrpc_request_set_replen(req);
 
-        rc = ptlrpc_queue_wait(req);
-        if (rc == 0) {
-                tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
-                memcpy(val, tmp, vallen);
-                if (ptlrpc_rep_need_swab(req)) {
-                        if (KEY_IS(KEY_FID2PATH)) {
-                                lustre_swab_fid2path(val);
-                        }
-                }
-        }
-        ptlrpc_req_finished(req);
+       rc = ptlrpc_queue_wait(req);
+       /* -EREMOTE means the get_info result is partial, and it needs to
+        * continue on another MDT, see fid2path part in lmv_iocontrol */
+       if (rc == 0 || rc == -EREMOTE) {
+               tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL);
+               memcpy(val, tmp, vallen);
+               if (ptlrpc_rep_need_swab(req)) {
+                       if (KEY_IS(KEY_FID2PATH))
+                               lustre_swab_fid2path(val);
+               }
+       }
+       ptlrpc_req_finished(req);
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 static void lustre_swab_hai(struct hsm_action_item *h)
@@ -1720,7 +2008,7 @@ static void lustre_swab_hal(struct hsm_action_list *h)
 
        __swab32s(&h->hal_version);
        __swab32s(&h->hal_count);
-       __swab32s(&h->hal_archive_num);
+       __swab32s(&h->hal_archive_id);
        __swab64s(&h->hal_flags);
        hai = hai_zero(h);
        for (i = 0; i < h->hal_count; i++) {
@@ -1758,11 +2046,12 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp,
                if (rc == 0)
                        rc = mdc_ioc_hsm_ct_unregister(imp);
        } else {
-               cfs_file_t *fp = cfs_get_fd(lk->lk_wfd);
+               struct file *fp = fget(lk->lk_wfd);
+
                rc = libcfs_kkuc_group_add(fp, lk->lk_uid, lk->lk_group,
                                           lk->lk_data);
                if (rc && fp)
-                       cfs_put_file(fp);
+                       fput(fp);
                if (rc == 0)
                        rc = mdc_ioc_hsm_ct_register(imp, archive);
        }
@@ -2139,6 +2428,18 @@ static int mdc_cancel_for_recovery(struct ldlm_lock *lock)
         RETURN(1);
 }
 
+static int mdc_resource_inode_free(struct ldlm_resource *res)
+{
+       if (res->lr_lvb_inode)
+               res->lr_lvb_inode = NULL;
+
+       return 0;
+}
+
+struct ldlm_valblock_ops inode_lvbo = {
+       lvbo_free: mdc_resource_inode_free
+};
+
 static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 {
         struct client_obd *cli = &obd->u.cli;
@@ -2168,6 +2469,8 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg)
 
         ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery);
 
+       obd->obd_namespace->ns_lvbo = &inode_lvbo;
+
         rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL);
         if (rc) {
                 mdc_cleanup(obd);
@@ -2443,7 +2746,7 @@ struct obd_ops mdc_obd_ops = {
 
 struct md_ops mdc_md_ops = {
         .m_getstatus        = mdc_getstatus,
-        .m_change_cbdata    = mdc_change_cbdata,
+        .m_null_inode      = mdc_null_inode,
         .m_find_cbdata      = mdc_find_cbdata,
         .m_close            = mdc_close,
         .m_create           = mdc_create,