X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=2a5b2c7537def5f378f13c15cf041963960c7406;hp=291bb0362b0d28219975ff323a749af6cde32afd;hb=3e76334402a71e37aed4e9a5166d0141e30af375;hpb=55ca00c3d1cd8635258ccbda27ee3f0f9b2966a8 diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 291bb03..2a5b2c7 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -38,9 +38,10 @@ #include #include #include -#ifdef HAVE_UIDGID_HEADER -# include -#endif +#include +#include +#include +#include #include @@ -48,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -239,8 +241,9 @@ again: rc = mdc_getattr_common(exp, req); if (rc) { if (rc == -ERANGE) { - acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize, - XATTR_SIZE_MAX); + acl_bufsize = min_t(__u32, + imp->imp_connect_data.ocd_max_easize, + XATTR_SIZE_MAX); mdc_reset_acl_req(req); goto again; } @@ -294,8 +297,9 @@ again: rc = mdc_getattr_common(exp, req); if (rc) { if (rc == -ERANGE) { - acl_bufsize = MIN(imp->imp_connect_data.ocd_max_easize, - XATTR_SIZE_MAX); + acl_bufsize = min_t(__u32, + imp->imp_connect_data.ocd_max_easize, + XATTR_SIZE_MAX); mdc_reset_acl_req(req); goto again; } @@ -348,7 +352,7 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, /* Flush local XATTR locks to get rid of a possible cancel RPC */ if (opcode == MDS_REINT && fid_is_sane(fid) && exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) { - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); int count; /* Without that packing would fail */ @@ -376,8 +380,8 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, if (opcode == MDS_REINT) { struct mdt_rec_setxattr *rec; - CLASSERT(sizeof(struct mdt_rec_setxattr) == - sizeof(struct mdt_rec_reint)); + BUILD_BUG_ON(sizeof(struct mdt_rec_setxattr) != + sizeof(struct mdt_rec_reint)); rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT); rec->sx_opcode = REINT_SETXATTR; rec->sx_fsuid = from_kuid(&init_user_ns, current_fsuid()); @@ -412,12 +416,12 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, /* make rpc */ if (opcode == MDS_REINT) - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); if (opcode == MDS_REINT) - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); if (rc) ptlrpc_req_finished(req); @@ -451,6 +455,9 @@ static int mdc_getxattr(struct obd_export *exp, const struct lu_fid *fid, LASSERT(obd_md_valid == OBD_MD_FLXATTR || obd_md_valid == OBD_MD_FLXATTRLS); + /* Message below is checked in sanity-selinux test_20d + * and sanity-sec test_49 + */ CDEBUG(D_INFO, "%s: get xattr '%s' for "DFID"\n", exp->exp_obd->obd_name, name, PFID(fid)); rc = mdc_xattr_common(exp, &RQF_MDS_GETXATTR, fid, MDS_GETXATTR, @@ -500,50 +507,10 @@ out: return rc; } -#ifdef CONFIG_FS_POSIX_ACL -static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md) -{ - struct req_capsule *pill = &req->rq_pill; - struct mdt_body *body = md->body; - struct posix_acl *acl; - void *buf; - int rc; - ENTRY; - - if (!body->mbo_aclsize) - RETURN(0); - - buf = req_capsule_server_sized_get(pill, &RMF_ACL, body->mbo_aclsize); - - if (!buf) - RETURN(-EPROTO); - - acl = posix_acl_from_xattr(&init_user_ns, buf, body->mbo_aclsize); - if (acl == NULL) - RETURN(0); - if (IS_ERR(acl)) { - rc = PTR_ERR(acl); - CERROR("convert xattr to acl: %d\n", rc); - RETURN(rc); - } - - rc = posix_acl_valid(&init_user_ns, acl); - if (rc) { - CERROR("validate acl: %d\n", rc); - posix_acl_release(acl); - RETURN(rc); - } - - md->posix_acl = acl; - RETURN(0); -} -#else -#define mdc_unpack_acl(req, md) 0 -#endif - -int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, - struct obd_export *dt_exp, struct obd_export *md_exp, - struct lustre_md *md) +static int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, + struct obd_export *dt_exp, + struct obd_export *md_exp, + struct lustre_md *md) { struct req_capsule *pill = &req->rq_pill; int rc; @@ -638,38 +605,30 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, } } } - rc = 0; + rc = 0; if (md->body->mbo_valid & OBD_MD_FLACL) { /* for ACL, it's possible that FLACL is set but aclsize is zero. * only when aclsize != 0 there's an actual segment for ACL * in reply buffer. */ - if (md->body->mbo_aclsize) { - rc = mdc_unpack_acl(req, md); - if (rc) - GOTO(out, rc); -#ifdef CONFIG_FS_POSIX_ACL - } else { - md->posix_acl = NULL; -#endif - } - } + rc = mdc_unpack_acl(req, md); + if (rc) + GOTO(out, rc); + } - EXIT; + EXIT; out: - if (rc) { -#ifdef CONFIG_FS_POSIX_ACL - posix_acl_release(md->posix_acl); -#endif - } - return rc; + if (rc) + lmd_clear_acl(md); + + return rc; } -int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md) +static int mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md) { - ENTRY; - RETURN(0); + ENTRY; + RETURN(0); } void mdc_replay_open(struct ptlrpc_request *req) @@ -679,11 +638,12 @@ void mdc_replay_open(struct ptlrpc_request *req) struct obd_client_handle *och; struct lustre_handle old_open_handle = { }; struct mdt_body *body; + struct ldlm_reply *rep; ENTRY; if (mod == NULL) { DEBUG_REQ(D_ERROR, req, - "Can't properly replay without open data."); + "cannot properly replay without open data"); EXIT; return; } @@ -691,6 +651,11 @@ void mdc_replay_open(struct ptlrpc_request *req) body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); LASSERT(body != NULL); + rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + if (rep != NULL && rep->lock_policy_res2 != 0) + DEBUG_REQ(D_ERROR, req, "Open request replay failed with %ld ", + (long int)rep->lock_policy_res2); + spin_lock(&req->rq_lock); och = mod->mod_och; if (och && och->och_open_handle.cookie) @@ -780,14 +745,14 @@ int mdc_set_open_replay_data(struct obd_export *exp, /* Outgoing messages always in my byte order. */ LASSERT(body != NULL); - /* Only if the import is replayable, we set replay_open data */ - if (och && imp->imp_replayable) { - mod = obd_mod_alloc(); - if (mod == NULL) { - DEBUG_REQ(D_ERROR, open_req, - "Can't allocate md_open_data"); - RETURN(0); - } + /* Only if the import is replayable, we set replay_open data */ + if (och && imp->imp_replayable) { + mod = obd_mod_alloc(); + if (mod == NULL) { + DEBUG_REQ(D_ERROR, open_req, + "cannot allocate md_open_data"); + RETURN(0); + } /** * Take a reference on \var mod, to be freed on mdc_close(). @@ -839,26 +804,27 @@ static void mdc_free_open(struct md_open_data *mod) * The worst thing is eviction if the client gets open lock **/ - DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request rq_replay" - "= %d\n", mod->mod_open_req->rq_replay); + DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, + "free open request, rq_replay=%d", + mod->mod_open_req->rq_replay); ptlrpc_request_committed(mod->mod_open_req, committed); if (mod->mod_close_req) ptlrpc_request_committed(mod->mod_close_req, committed); } -int mdc_clear_open_replay_data(struct obd_export *exp, - struct obd_client_handle *och) +static int mdc_clear_open_replay_data(struct obd_export *exp, + struct obd_client_handle *och) { - struct md_open_data *mod = och->och_mod; - ENTRY; + struct md_open_data *mod = och->och_mod; + ENTRY; - /** - * It is possible to not have \var mod in a case of eviction between - * lookup and ll_file_open(). - **/ - if (mod == NULL) - RETURN(0); + /** + * It is possible to not have \var mod in a case of eviction between + * lookup and ll_file_open(). + **/ + if (mod == NULL) + RETURN(0); LASSERT(mod != LP_POISON); LASSERT(mod->mod_open_req != NULL); @@ -870,11 +836,11 @@ int mdc_clear_open_replay_data(struct obd_export *exp, spin_unlock(&mod->mod_open_req->rq_lock); mdc_free_open(mod); - mod->mod_och = NULL; - och->och_mod = NULL; - obd_mod_put(mod); + mod->mod_och = NULL; + och->och_mod = NULL; + obd_mod_put(mod); - RETURN(0); + RETURN(0); } static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, @@ -930,7 +896,7 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, mod->mod_close_req = req; - DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); + DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "matched open"); /* We no longer want to preserve this open for replay even * though the open was committed. b=3632, b=3633 */ spin_lock(&mod->mod_open_req->rq_lock); @@ -977,41 +943,41 @@ static int mdc_close(struct obd_export *exp, struct md_op_data *op_data, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); - - if (req->rq_repmsg == NULL) { - CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req, - req->rq_status); - if (rc == 0) - rc = req->rq_status ?: -EIO; - } else if (rc == 0 || rc == -EAGAIN) { - struct mdt_body *body; - - rc = lustre_msg_get_status(req->rq_repmsg); - if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { - DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err " - "= %d", rc); - if (rc > 0) - rc = -rc; - } - body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); - if (body == NULL) - rc = -EPROTO; - } else if (rc == -ESTALE) { - /** - * it can be allowed error after 3633 if open was committed and - * server failed before close was sent. Let's check if mod - * exists and return no error in that case - */ - if (mod) { - DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc); - LASSERT(mod->mod_open_req != NULL); - if (mod->mod_open_req->rq_committed) - rc = 0; - } - } + ptlrpc_put_mod_rpc_slot(req); + + if (req->rq_repmsg == NULL) { + CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req, + req->rq_status); + if (rc == 0) + rc = req->rq_status ?: -EIO; + } else if (rc == 0 || rc == -EAGAIN) { + struct mdt_body *body; + + rc = lustre_msg_get_status(req->rq_repmsg); + if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { + DEBUG_REQ(D_ERROR, req, + "type = PTL_RPC_MSG_ERR: rc = %d", rc); + if (rc > 0) + rc = -rc; + } + body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY); + if (body == NULL) + rc = -EPROTO; + } else if (rc == -ESTALE) { + /** + * it can be allowed error after 3633 if open was committed and + * server failed before close was sent. Let's check if mod + * exists and return no error in that case + */ + if (mod) { + DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc); + LASSERT(mod->mod_open_req != NULL); + if (mod->mod_open_req->rq_committed) + rc = 0; + } + } out: if (mod) { @@ -1033,14 +999,11 @@ static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid, struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; int i; - wait_queue_head_t waitq; int resends = 0; - struct l_wait_info lwi; int rc; ENTRY; *request = NULL; - init_waitqueue_head(&waitq); restart_bulk: req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE); @@ -1057,7 +1020,7 @@ restart_bulk: ptlrpc_at_set_req_timeout(req); desc = ptlrpc_prep_bulk_imp(req, npages, 1, - PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV, + PTLRPC_BULK_PUT_SINK, MDS_BULK_PORTAL, &ptlrpc_bulk_kiov_pin_ops); if (desc == NULL) { @@ -1085,9 +1048,12 @@ restart_bulk: exp->exp_obd->obd_name, -EIO); RETURN(-EIO); } - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, - NULL); - l_wait_event(waitq, 0, &lwi); + + /* If a signal interrupts then the timeout returned will + * not be zero. In that case return -EINTR + */ + if (msleep_interruptible(resends * 1000)) + RETURN(-EINTR); goto restart_bulk; } @@ -1116,7 +1082,7 @@ static void mdc_release_page(struct page *page, int remove) if (remove) { lock_page(page); if (likely(page->mapping != NULL)) - truncate_complete_page(page->mapping, page); + delete_from_page_cache(page); unlock_page(page); } put_page(page); @@ -1132,16 +1098,17 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, */ unsigned long offset = hash_x_index(*hash, hash64); struct page *page; + unsigned long flags; int found; - xa_lock_irq(&mapping->i_pages); + ll_xa_lock_irqsave(&mapping->i_pages, flags); found = radix_tree_gang_lookup(&mapping->page_tree, (void **)&page, offset, 1); - if (found > 0 && !radix_tree_exceptional_entry(page)) { + if (found > 0 && !ll_xa_is_value(page)) { struct lu_dirpage *dp; get_page(page); - xa_unlock_irq(&mapping->i_pages); + ll_xa_unlock_irqrestore(&mapping->i_pages, flags); /* * In contrast to find_lock_page() we are sure that directory * page cannot be truncated (while DLM lock is held) and, @@ -1190,7 +1157,7 @@ static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, page = ERR_PTR(-EIO); } } else { - xa_unlock_irq(&mapping->i_pages); + ll_xa_unlock_irqrestore(&mapping->i_pages, flags); page = NULL; } return page; @@ -1252,12 +1219,12 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) int i; for (i = 0; i < cfs_pgs; i++) { - struct lu_dirpage *dp = kmap(pages[i]); - struct lu_dirpage *first = dp; - struct lu_dirent *end_dirent = NULL; - struct lu_dirent *ent; - __u64 hash_end = le64_to_cpu(dp->ldp_hash_end); - __u32 flags = le32_to_cpu(dp->ldp_flags); + struct lu_dirpage *dp = kmap(pages[i]); + struct lu_dirpage *first = dp; + struct lu_dirent *end_dirent = NULL; + struct lu_dirent *ent; + __u64 hash_end = dp->ldp_hash_end; + __u32 flags = dp->ldp_flags; while (--lu_pgs > 0) { ent = lu_dirent_start(dp); @@ -1272,8 +1239,8 @@ static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) break; /* Save the hash and flags of this lu_dirpage. */ - hash_end = le64_to_cpu(dp->ldp_hash_end); - flags = le32_to_cpu(dp->ldp_flags); + hash_end = dp->ldp_hash_end; + flags = dp->ldp_flags; /* Check if lu_dirpage contains no entries. */ if (end_dirent == NULL) @@ -1308,14 +1275,6 @@ struct readpage_param { struct md_callback *rp_cb; }; -#ifndef HAVE_DELETE_FROM_PAGE_CACHE -static inline void delete_from_page_cache(struct page *page) -{ - remove_from_page_cache(page); - put_page(page); -} -#endif - /** * Read pages from server. * @@ -1348,7 +1307,7 @@ static int mdc_read_page_remote(void *data, struct page *page0) fid = &op_data->op_fid1; LASSERT(inode != NULL); - OBD_ALLOC(page_pool, sizeof(page_pool[0]) * max_pages); + OBD_ALLOC_PTR_ARRAY(page_pool, max_pages); if (page_pool != NULL) { page_pool[0] = page0; } else { @@ -1357,8 +1316,7 @@ static int mdc_read_page_remote(void *data, struct page *page0) } for (npages = 1; npages < max_pages; npages++) { - page = __page_cache_alloc(mapping_gfp_mask(inode->i_mapping) - | __GFP_COLD); + page = page_cache_alloc(inode->i_mapping); if (page == NULL) break; page_pool[npages] = page; @@ -1418,7 +1376,7 @@ static int mdc_read_page_remote(void *data, struct page *page0) } if (page_pool != &page0) - OBD_FREE(page_pool, sizeof(page_pool[0]) * max_pages); + OBD_FREE_PTR_ARRAY(page_pool, max_pages); RETURN(rc); } @@ -1571,6 +1529,53 @@ fail: goto out_unlock; } +static int mdc_statfs_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *args, int rc) +{ + struct obd_info *oinfo = args; + struct obd_statfs *osfs; + + if (!rc) { + osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (!osfs) + return -EPROTO; + + oinfo->oi_osfs = osfs; + + CDEBUG(D_CACHE, "blocks=%llu free=%llu avail=%llu " + "objects=%llu free=%llu state=%x\n", + osfs->os_blocks, osfs->os_bfree, osfs->os_bavail, + osfs->os_files, osfs->os_ffree, osfs->os_state); + } + + oinfo->oi_cb_up(oinfo, rc); + + return rc; +} + +static int mdc_statfs_async(struct obd_export *exp, + struct obd_info *oinfo, time64_t max_age, + struct ptlrpc_request_set *unused) +{ + struct ptlrpc_request *req; + struct obd_info *aa; + + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_STATFS, + LUSTRE_MDS_VERSION, MDS_STATFS); + if (req == NULL) + return -ENOMEM; + + ptlrpc_request_set_replen(req); + req->rq_interpret_reply = mdc_statfs_interpret; + + aa = ptlrpc_req_async_args(aa, req); + *aa = *oinfo; + + ptlrpcd_add_req(req); + + return 0; +} + static int mdc_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_statfs *osfs, time64_t max_age, __u32 flags) @@ -1719,9 +1724,9 @@ static int mdc_ioc_hsm_progress(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); out: @@ -1777,6 +1782,7 @@ static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archive_count, *archive_array = archive_count; ptlrpc_request_set_replen(req); + req->rq_no_resend = 1; rc = mdc_queue_wait(req); GOTO(out, rc); @@ -1922,9 +1928,9 @@ static int mdc_ioc_hsm_state_set(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); out: @@ -1982,9 +1988,9 @@ static int mdc_ioc_hsm_request(struct obd_export *exp, ptlrpc_request_set_replen(req); - mdc_get_mod_rpc_slot(req, NULL); + ptlrpc_get_mod_rpc_slot(req); rc = ptlrpc_queue_wait(req); - mdc_put_mod_rpc_slot(req, NULL); + ptlrpc_put_mod_rpc_slot(req); GOTO(out, rc); @@ -1999,34 +2005,51 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp, static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp, struct obd_quotactl *oqctl) { - struct ptlrpc_request *req; - struct obd_quotactl *oqc; - int rc; + struct ptlrpc_request *req; + struct obd_quotactl *oqc; + int rc; ENTRY; - req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), - &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION, - MDS_QUOTACTL); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_QUOTACTL); if (req == NULL) RETURN(-ENOMEM); + + if (LUSTRE_Q_CMD_IS_POOL(oqctl->qc_cmd)) + req_capsule_set_size(&req->rq_pill, + &RMF_OBD_QUOTACTL, + RCL_CLIENT, + sizeof(*oqc) + LOV_MAXPOOLNAME + 1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, + MDS_QUOTACTL); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); - *oqc = *oqctl; + QCTL_COPY(oqc, oqctl); ptlrpc_request_set_replen(req); ptlrpc_at_set_req_timeout(req); rc = ptlrpc_queue_wait(req); - if (rc) - CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc); + if (rc) { + CERROR("%s: ptlrpc_queue_wait failed: rc = %d\n", + exp->exp_obd->obd_name, rc); + GOTO(out, rc); + } if (req->rq_repmsg && (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) { - *oqctl = *oqc; + QCTL_COPY(oqctl, oqc); } else if (!rc) { - CERROR ("Can't unpack obd_quotactl\n"); rc = -EPROTO; + CERROR("%s: cannot unpack obd_quotactl: rc = %d\n", + exp->exp_obd->obd_name, rc); } +out: ptlrpc_req_finished(req); RETURN(rc); @@ -2035,7 +2058,7 @@ static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp, static int mdc_ioc_swap_layouts(struct obd_export *exp, struct md_op_data *op_data) { - struct list_head cancels = LIST_HEAD_INIT(cancels); + LIST_HEAD(cancels); struct ptlrpc_request *req; int rc, count; struct mdc_swap_layouts *msl, *payload; @@ -2526,6 +2549,81 @@ static int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } +struct mdc_rmfid_args { + int *mra_rcs; + int mra_nr; +}; + +int mdc_rmfid_interpret(const struct lu_env *env, struct ptlrpc_request *req, + void *args, int rc) +{ + struct mdc_rmfid_args *aa; + int *rcs, size; + ENTRY; + + if (!rc) { + aa = ptlrpc_req_async_args(aa, req); + + size = req_capsule_get_size(&req->rq_pill, &RMF_RCS, + RCL_SERVER); + LASSERT(size == sizeof(int) * aa->mra_nr); + rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS); + LASSERT(rcs); + LASSERT(aa->mra_rcs); + LASSERT(aa->mra_nr); + memcpy(aa->mra_rcs, rcs, size); + } + + RETURN(rc); +} + +static int mdc_rmfid(struct obd_export *exp, struct fid_array *fa, + int *rcs, struct ptlrpc_request_set *set) +{ + struct ptlrpc_request *req; + struct mdc_rmfid_args *aa; + struct mdt_body *b; + struct lu_fid *tmp; + int rc, flen; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_RMFID); + if (req == NULL) + RETURN(-ENOMEM); + + flen = fa->fa_nr * sizeof(struct lu_fid); + req_capsule_set_size(&req->rq_pill, &RMF_FID_ARRAY, + RCL_CLIENT, flen); + req_capsule_set_size(&req->rq_pill, &RMF_FID_ARRAY, + RCL_SERVER, flen); + req_capsule_set_size(&req->rq_pill, &RMF_RCS, + RCL_SERVER, fa->fa_nr * sizeof(__u32)); + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_RMFID); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + tmp = req_capsule_client_get(&req->rq_pill, &RMF_FID_ARRAY); + memcpy(tmp, fa->fa_fids, flen); + + mdc_pack_body(req, NULL, 0, 0, -1, 0); + b = req_capsule_client_get(&req->rq_pill, &RMF_MDT_BODY); + b->mbo_ctime = ktime_get_real_seconds(); + + ptlrpc_request_set_replen(req); + + LASSERT(rcs); + aa = ptlrpc_req_async_args(aa, req); + aa->mra_rcs = rcs; + aa->mra_nr = fa->fa_nr; + req->rq_interpret_reply = mdc_rmfid_interpret; + + ptlrpc_set_add_req(set, req); + ptlrpc_check_set(NULL, set); + + RETURN(rc); +} + static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, enum obd_import_event event) { @@ -2783,67 +2881,97 @@ static int mdc_cleanup(struct obd_device *obd) return osc_cleanup_common(obd); } -static struct obd_ops mdc_obd_ops = { - .o_owner = THIS_MODULE, - .o_setup = mdc_setup, - .o_precleanup = mdc_precleanup, - .o_cleanup = mdc_cleanup, - .o_add_conn = client_import_add_conn, - .o_del_conn = client_import_del_conn, - .o_connect = client_connect_import, +static const struct obd_ops mdc_obd_ops = { + .o_owner = THIS_MODULE, + .o_setup = mdc_setup, + .o_precleanup = mdc_precleanup, + .o_cleanup = mdc_cleanup, + .o_add_conn = client_import_add_conn, + .o_del_conn = client_import_del_conn, + .o_connect = client_connect_import, .o_reconnect = osc_reconnect, .o_disconnect = osc_disconnect, - .o_iocontrol = mdc_iocontrol, - .o_set_info_async = mdc_set_info_async, - .o_statfs = mdc_statfs, + .o_iocontrol = mdc_iocontrol, + .o_set_info_async = mdc_set_info_async, + .o_statfs = mdc_statfs, + .o_statfs_async = mdc_statfs_async, .o_fid_init = client_fid_init, .o_fid_fini = client_fid_fini, - .o_fid_alloc = mdc_fid_alloc, - .o_import_event = mdc_import_event, - .o_get_info = mdc_get_info, - .o_get_uuid = mdc_get_uuid, - .o_quotactl = mdc_quotactl, + .o_fid_alloc = mdc_fid_alloc, + .o_import_event = mdc_import_event, + .o_get_info = mdc_get_info, + .o_get_uuid = mdc_get_uuid, + .o_quotactl = mdc_quotactl, }; -static struct md_ops mdc_md_ops = { +static const struct md_ops mdc_md_ops = { .m_get_root = mdc_get_root, - .m_null_inode = mdc_null_inode, - .m_close = mdc_close, - .m_create = mdc_create, - .m_enqueue = mdc_enqueue, - .m_getattr = mdc_getattr, - .m_getattr_name = mdc_getattr_name, - .m_intent_lock = mdc_intent_lock, - .m_link = mdc_link, - .m_rename = mdc_rename, - .m_setattr = mdc_setattr, - .m_setxattr = mdc_setxattr, - .m_getxattr = mdc_getxattr, + .m_null_inode = mdc_null_inode, + .m_close = mdc_close, + .m_create = mdc_create, + .m_enqueue = mdc_enqueue, + .m_getattr = mdc_getattr, + .m_getattr_name = mdc_getattr_name, + .m_intent_lock = mdc_intent_lock, + .m_link = mdc_link, + .m_rename = mdc_rename, + .m_setattr = mdc_setattr, + .m_setxattr = mdc_setxattr, + .m_getxattr = mdc_getxattr, .m_fsync = mdc_fsync, .m_file_resync = mdc_file_resync, .m_read_page = mdc_read_page, - .m_unlink = mdc_unlink, - .m_cancel_unused = mdc_cancel_unused, - .m_init_ea_size = mdc_init_ea_size, - .m_set_lock_data = mdc_set_lock_data, - .m_lock_match = mdc_lock_match, - .m_get_lustre_md = mdc_get_lustre_md, - .m_free_lustre_md = mdc_free_lustre_md, - .m_set_open_replay_data = mdc_set_open_replay_data, - .m_clear_open_replay_data = mdc_clear_open_replay_data, - .m_intent_getattr_async = mdc_intent_getattr_async, - .m_revalidate_lock = mdc_revalidate_lock + .m_unlink = mdc_unlink, + .m_cancel_unused = mdc_cancel_unused, + .m_init_ea_size = mdc_init_ea_size, + .m_set_lock_data = mdc_set_lock_data, + .m_lock_match = mdc_lock_match, + .m_get_lustre_md = mdc_get_lustre_md, + .m_free_lustre_md = mdc_free_lustre_md, + .m_set_open_replay_data = mdc_set_open_replay_data, + .m_clear_open_replay_data = mdc_clear_open_replay_data, + .m_intent_getattr_async = mdc_intent_getattr_async, + .m_revalidate_lock = mdc_revalidate_lock, + .m_rmfid = mdc_rmfid, }; +dev_t mdc_changelog_dev; +struct class *mdc_changelog_class; static int __init mdc_init(void) { - return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL, - LUSTRE_MDC_NAME, &mdc_device_type); + int rc = 0; + rc = alloc_chrdev_region(&mdc_changelog_dev, 0, + MDC_CHANGELOG_DEV_COUNT, + MDC_CHANGELOG_DEV_NAME); + if (rc) + return rc; + + mdc_changelog_class = class_create(THIS_MODULE, MDC_CHANGELOG_DEV_NAME); + if (IS_ERR(mdc_changelog_class)) { + rc = PTR_ERR(mdc_changelog_class); + goto out_dev; + } + + rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, true, + LUSTRE_MDC_NAME, &mdc_device_type); + if (rc) + goto out_class; + + return 0; + +out_class: + class_destroy(mdc_changelog_class); +out_dev: + unregister_chrdev_region(mdc_changelog_dev, MDC_CHANGELOG_DEV_COUNT); + return rc; } static void __exit mdc_exit(void) { - class_unregister_type(LUSTRE_MDC_NAME); + class_unregister_type(LUSTRE_MDC_NAME); + class_destroy(mdc_changelog_class); + unregister_chrdev_region(mdc_changelog_dev, MDC_CHANGELOG_DEV_COUNT); + idr_destroy(&mdc_changelog_minor_idr); } MODULE_AUTHOR("OpenSFS, Inc. ");