X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=9dc00ce0846f4266c20021b453406c55ad0c0af3;hp=4dd35b60a3a26fc57c5f7150f20d77d312bf4949;hb=8d161d44214f907a9f2d9cf5a79cd2c83de995c3;hpb=aa37f159e721723c3a719592c4d75df4c774b46c diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 4dd35b6..9dc00ce 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Whamcloud, Inc. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,17 +41,21 @@ # include # include # include +# include #else # include #endif #include +#include #include +#include #include -#include #include #include #include +#include +#include #include "mdc_internal.h" @@ -87,6 +91,24 @@ int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req, } } +static inline int mdc_queue_wait(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + int rc; + + /* obd_get_request_slot() ensures that this client has no more + * than cl_max_rpcs_in_flight RPCs simultaneously inf light + * against an MDT. */ + rc = obd_get_request_slot(cli); + if (rc != 0) + return rc; + + rc = ptlrpc_queue_wait(req); + obd_put_request_slot(cli); + + return rc; +} + /* Helper that implements most of mdc_getstatus and signal_completed_replay. */ /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */ static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid, @@ -172,14 +194,13 @@ static int mdc_getattr_common(struct obd_export *exp, CDEBUG(D_NET, "mode: %o\n", body->mode); - if (body->eadatasize != 0) { - mdc_update_max_ea_from_body(exp, body); - - eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, - body->eadatasize); - if (eadata == NULL) - RETURN(-EPROTO); - } + mdc_update_max_ea_from_body(exp, body); + if (body->eadatasize != 0) { + eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + body->eadatasize); + if (eadata == NULL) + RETURN(-EPROTO); + } if (body->valid & OBD_MD_FLRMTPERM) { struct mdt_remote_perm *perm; @@ -208,6 +229,11 @@ int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, int rc; ENTRY; + /* Single MDS without an LMV case */ + if (op_data->op_flags & MF_GET_MDT_IDX) { + op_data->op_mds = 0; + RETURN(0); + } *request = NULL; req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR); if (req == NULL) @@ -345,27 +371,45 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, input_size); } - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + /* Flush local XATTR locks to get rid of a possible cancel RPC */ + if (opcode == MDS_REINT && fid_is_sane(fid) && + exp->exp_connect_data.ocd_ibits_known & MDS_INODELOCK_XATTR) { + struct list_head cancels = LIST_HEAD_INIT(cancels); + int count; + + /* Without that packing would fail */ + if (input_size == 0) + req_capsule_set_size(&req->rq_pill, &RMF_EADATA, + RCL_CLIENT, 0); + + count = mdc_resource_get_unused(exp, fid, + &cancels, LCK_EX, + MDS_INODELOCK_XATTR); + + rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + } else { + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, opcode); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + } if (opcode == MDS_REINT) { struct mdt_rec_setxattr *rec; CLASSERT(sizeof(struct mdt_rec_setxattr) == sizeof(struct mdt_rec_reint)); - rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT); - rec->sx_opcode = REINT_SETXATTR; - /* TODO: - * cfs_curproc_fs{u,g}id() should replace - * current->fs{u,g}id for portability. - */ - rec->sx_fsuid = cfs_curproc_fsuid(); - rec->sx_fsgid = cfs_curproc_fsgid(); - rec->sx_cap = cfs_curproc_cap_pack(); - rec->sx_suppgid1 = suppgid; + rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT); + rec->sx_opcode = REINT_SETXATTR; + rec->sx_fsuid = from_kuid(&init_user_ns, current_fsuid()); + rec->sx_fsgid = from_kgid(&init_user_ns, current_fsgid()); + rec->sx_cap = cfs_curproc_cap_pack(); + rec->sx_suppgid1 = suppgid; rec->sx_suppgid2 = -1; rec->sx_fid = *fid; rec->sx_valid = valid | OBD_MD_FLCTIME; @@ -448,7 +492,7 @@ static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md) if (!buf) RETURN(-EPROTO); - acl = posix_acl_from_xattr(buf, body->aclsize); + acl = posix_acl_from_xattr(&init_user_ns, buf, body->aclsize); if (IS_ERR(acl)) { rc = PTR_ERR(acl); CERROR("convert xattr to acl: %d\n", rc); @@ -529,25 +573,25 @@ int mdc_get_lustre_md(struct obd_export *exp, struct ptlrpc_request *req, "but eadatasize 0\n"); RETURN(-EPROTO); } - if (md->body->valid & OBD_MD_MEA) { - lmvsize = md->body->eadatasize; - lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD, - lmvsize); - if (!lmv) - GOTO(out, rc = -EPROTO); - - rc = obd_unpackmd(md_exp, (void *)&md->mea, lmv, - lmvsize); - if (rc < 0) - GOTO(out, rc); - - if (rc < sizeof(*md->mea)) { - CDEBUG(D_INFO, "size too small: " - "rc < sizeof(*md->mea) (%d < %d)\n", - rc, (int)sizeof(*md->mea)); - GOTO(out, rc = -EPROTO); - } - } + if (md->body->valid & OBD_MD_MEA) { + lmvsize = md->body->eadatasize; + lmv = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + lmvsize); + if (!lmv) + GOTO(out, rc = -EPROTO); + + rc = obd_unpackmd(md_exp, (void *)&md->lmv, lmv, + lmvsize); + if (rc < 0) + GOTO(out, rc); + + if (rc < sizeof(*md->lmv)) { + CDEBUG(D_INFO, "size too small: " + "rc < sizeof(*md->lmv) (%d < %d)\n", + rc, (int)sizeof(*md->lmv)); + GOTO(out, rc = -EPROTO); + } + } } rc = 0; @@ -692,22 +736,23 @@ void mdc_commit_open(struct ptlrpc_request *req) * be put along with freeing \var mod. */ ptlrpc_request_addref(req); - cfs_spin_lock(&req->rq_lock); - req->rq_committed = 1; - cfs_spin_unlock(&req->rq_lock); - req->rq_cb_data = NULL; - obd_mod_put(mod); + spin_lock(&req->rq_lock); + req->rq_committed = 1; + spin_unlock(&req->rq_lock); + req->rq_cb_data = NULL; + obd_mod_put(mod); } int mdc_set_open_replay_data(struct obd_export *exp, - struct obd_client_handle *och, - struct ptlrpc_request *open_req) + struct obd_client_handle *och, + struct lookup_intent *it) { - struct md_open_data *mod; - struct mdt_rec_create *rec; - struct mdt_body *body; - struct obd_import *imp = open_req->rq_import; - ENTRY; + struct md_open_data *mod; + struct mdt_rec_create *rec; + struct mdt_body *body; + struct ptlrpc_request *open_req = it->d.lustre.it_data; + struct obd_import *imp = open_req->rq_import; + ENTRY; if (!open_req->rq_replay) RETURN(0); @@ -737,13 +782,15 @@ int mdc_set_open_replay_data(struct obd_export *exp, obd_mod_get(mod); obd_mod_get(mod); - cfs_spin_lock(&open_req->rq_lock); - och->och_mod = mod; - mod->mod_och = och; - mod->mod_open_req = open_req; - open_req->rq_cb_data = mod; - open_req->rq_commit_cb = mdc_commit_open; - cfs_spin_unlock(&open_req->rq_lock); + spin_lock(&open_req->rq_lock); + och->och_mod = mod; + mod->mod_och = och; + mod->mod_is_create = it_disposition(it, DISP_OPEN_CREATE) || + it_disposition(it, DISP_OPEN_STRIPE); + mod->mod_open_req = open_req; + open_req->rq_cb_data = mod; + open_req->rq_commit_cb = mdc_commit_open; + spin_unlock(&open_req->rq_lock); } rec->cr_fid2 = body->fid1; @@ -760,6 +807,23 @@ int mdc_set_open_replay_data(struct obd_export *exp, RETURN(0); } +static void mdc_free_open(struct md_open_data *mod) +{ + int committed = 0; + + if (mod->mod_is_create == 0 && + imp_connect_disp_stripe(mod->mod_open_req->rq_import)) + committed = 1; + + LASSERT(mod->mod_open_req->rq_replay == 0); + + DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "free open request\n"); + + ptlrpc_request_committed(mod->mod_open_req, committed); + if (mod->mod_close_req) + ptlrpc_request_committed(mod->mod_close_req, committed); +} + int mdc_clear_open_replay_data(struct obd_export *exp, struct obd_client_handle *och) { @@ -774,6 +838,8 @@ int mdc_clear_open_replay_data(struct obd_export *exp, RETURN(0); LASSERT(mod != LP_POISON); + LASSERT(mod->mod_open_req != NULL); + mdc_free_open(mod); mod->mod_och = NULL; och->och_mod = NULL; @@ -801,13 +867,29 @@ static void mdc_close_handle_reply(struct ptlrpc_request *req, int mdc_close(struct obd_export *exp, struct md_op_data *op_data, struct md_open_data *mod, struct ptlrpc_request **request) { - struct obd_device *obd = class_exp2obd(exp); - struct ptlrpc_request *req; - int rc; - ENTRY; + struct obd_device *obd = class_exp2obd(exp); + struct ptlrpc_request *req; + struct req_format *req_fmt; + int rc; + int saved_rc = 0; + ENTRY; + + req_fmt = &RQF_MDS_CLOSE; + if (op_data->op_bias & MDS_HSM_RELEASE) { + req_fmt = &RQF_MDS_RELEASE_CLOSE; + + /* allocate a FID for volatile file */ + rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data); + if (rc < 0) { + CERROR("%s: "DFID" failed to allocate FID: %d\n", + obd->obd_name, PFID(&op_data->op_fid1), rc); + /* save the errcode and proceed to close */ + saved_rc = rc; + } + } - *request = NULL; - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_CLOSE); + *request = NULL; + req = ptlrpc_request_alloc(class_exp2cliimp(exp), req_fmt); if (req == NULL) RETURN(-ENOMEM); @@ -836,19 +918,19 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); /* We no longer want to preserve this open for replay even * though the open was committed. b=3632, b=3633 */ - cfs_spin_lock(&mod->mod_open_req->rq_lock); - mod->mod_open_req->rq_replay = 0; - cfs_spin_unlock(&mod->mod_open_req->rq_lock); + spin_lock(&mod->mod_open_req->rq_lock); + mod->mod_open_req->rq_replay = 0; + spin_unlock(&mod->mod_open_req->rq_lock); } else { CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); } mdc_close_pack(req, op_data); - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, - obd->u.cli.cl_max_mds_easize); - req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER, - obd->u.cli.cl_max_mds_cookiesize); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + obd->u.cli.cl_default_mds_easize); + req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER, + obd->u.cli.cl_default_mds_cookiesize); ptlrpc_request_set_replen(req); @@ -897,7 +979,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, } *request = req; mdc_close_handle_reply(req, op_data, rc); - RETURN(rc); + RETURN(rc < 0 ? rc : saved_rc); } int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, @@ -929,9 +1011,9 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr"); /* We no longer want to preserve this setattr for replay even * though the open was committed. b=3632, b=3633 */ - cfs_spin_lock(&mod->mod_open_req->rq_lock); - mod->mod_open_req->rq_replay = 0; - cfs_spin_unlock(&mod->mod_open_req->rq_lock); + spin_lock(&mod->mod_open_req->rq_lock); + mod->mod_open_req->rq_replay = 0; + spin_unlock(&mod->mod_open_req->rq_lock); } mdc_close_pack(req, op_data); @@ -957,6 +1039,9 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, if (mod) { if (rc != 0) mod->mod_close_req = NULL; + LASSERT(mod->mod_open_req != NULL); + mdc_free_open(mod); + /* Since now, mod is accessed through setattr req only, * thus DW req does not keep a reference on mod anymore. */ obd_mod_put(mod); @@ -992,9 +1077,9 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); - desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); + desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); /* NB req now owns desc and will free it when it gets freed. */ ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset); @@ -1013,89 +1098,691 @@ out: EXPORT_SYMBOL(mdc_sendpage); #endif -int mdc_readpage(struct obd_export *exp, struct md_op_data *op_data, - struct page **pages, struct ptlrpc_request **request) +static int mdc_getpage(struct obd_export *exp, const struct lu_fid *fid, + __u64 offset, struct obd_capa *oc, + struct page **pages, int npages, + struct ptlrpc_request **request) { - struct ptlrpc_request *req; - struct ptlrpc_bulk_desc *desc; - int i; - cfs_waitq_t waitq; - int resends = 0; - struct l_wait_info lwi; - int rc; - ENTRY; - - *request = NULL; - cfs_waitq_init(&waitq); + struct ptlrpc_request *req; + struct ptlrpc_bulk_desc *desc; + int i; + wait_queue_head_t waitq; + int resends = 0; + struct l_wait_info lwi; + int rc; + ENTRY; + + *request = NULL; + init_waitqueue_head(&waitq); restart_bulk: - req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE); - if (req == NULL) - RETURN(-ENOMEM); + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE); + if (req == NULL) + RETURN(-ENOMEM); - mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + mdc_set_capa_size(req, &RMF_CAPA1, oc); - rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE); - if (rc) { - ptlrpc_request_free(req); - RETURN(rc); - } + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } - req->rq_request_portal = MDS_READPAGE_PORTAL; - ptlrpc_at_set_req_timeout(req); + req->rq_request_portal = MDS_READPAGE_PORTAL; + ptlrpc_at_set_req_timeout(req); - desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, BULK_PUT_SINK, - MDS_BULK_PORTAL); - if (desc == NULL) { - ptlrpc_request_free(req); - RETURN(-ENOMEM); - } + desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK, + MDS_BULK_PORTAL); + if (desc == NULL) { + ptlrpc_request_free(req); + RETURN(-ENOMEM); + } - /* NB req now owns desc and will free it when it gets freed */ - for (i = 0; i < op_data->op_npages; i++) - ptlrpc_prep_bulk_page(desc, pages[i], 0, CFS_PAGE_SIZE); + /* NB req now owns desc and will free it when it gets freed */ + for (i = 0; i < npages; i++) + ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); - mdc_readdir_pack(req, op_data->op_offset, - CFS_PAGE_SIZE * op_data->op_npages, - &op_data->op_fid1, op_data->op_capa1); + mdc_readdir_pack(req, offset, PAGE_CACHE_SIZE * npages, fid, oc); - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc) { - ptlrpc_req_finished(req); - if (rc != -ETIMEDOUT) - RETURN(rc); + ptlrpc_request_set_replen(req); + rc = ptlrpc_queue_wait(req); + if (rc) { + ptlrpc_req_finished(req); + if (rc != -ETIMEDOUT) + RETURN(rc); + + resends++; + if (!client_should_resend(resends, &exp->exp_obd->u.cli)) { + CERROR("%s: too many resend retries: rc = %d\n", + exp->exp_obd->obd_name, -EIO); + RETURN(-EIO); + } + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, + NULL); + l_wait_event(waitq, 0, &lwi); + + goto restart_bulk; + } - resends++; - if (!client_should_resend(resends, &exp->exp_obd->u.cli)) { - CERROR("too many resend retries, returning error\n"); - RETURN(-EIO); - } - lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL); - l_wait_event(waitq, 0, &lwi); + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) { + ptlrpc_req_finished(req); + RETURN(rc); + } - goto restart_bulk; - } + if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) { + CERROR("%s: unexpected bytes transferred: %d (%ld expected)\n", + exp->exp_obd->obd_name, req->rq_bulk->bd_nob_transferred, + PAGE_CACHE_SIZE * npages); + ptlrpc_req_finished(req); + RETURN(-EPROTO); + } - rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, - req->rq_bulk->bd_nob_transferred); - if (rc < 0) { - ptlrpc_req_finished(req); - RETURN(rc); - } + *request = req; + RETURN(0); +} - if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) { - CERROR("Unexpected # bytes transferred: %d (%ld expected)\n", - req->rq_bulk->bd_nob_transferred, - CFS_PAGE_SIZE * op_data->op_npages); - ptlrpc_req_finished(req); - RETURN(-EPROTO); - } +#ifdef __KERNEL__ +static void mdc_release_page(struct page *page, int remove) +{ + if (remove) { + lock_page(page); + if (likely(page->mapping != NULL)) + truncate_complete_page(page->mapping, page); + unlock_page(page); + } + page_cache_release(page); +} - *request = req; - RETURN(0); +static struct page *mdc_page_locate(struct address_space *mapping, __u64 *hash, + __u64 *start, __u64 *end, int hash64) +{ + /* + * Complement of hash is used as an index so that + * radix_tree_gang_lookup() can be used to find a page with starting + * hash _smaller_ than one we are looking for. + */ + unsigned long offset = hash_x_index(*hash, hash64); + struct page *page; + int found; + + spin_lock_irq(&mapping->tree_lock); + found = radix_tree_gang_lookup(&mapping->page_tree, + (void **)&page, offset, 1); + if (found > 0) { + struct lu_dirpage *dp; + + page_cache_get(page); + spin_unlock_irq(&mapping->tree_lock); + /* + * In contrast to find_lock_page() we are sure that directory + * page cannot be truncated (while DLM lock is held) and, + * hence, can avoid restart. + * + * In fact, page cannot be locked here at all, because + * mdc_read_page_remote does synchronous io. + */ + wait_on_page_locked(page); + if (PageUptodate(page)) { + dp = kmap(page); + if (BITS_PER_LONG == 32 && hash64) { + *start = le64_to_cpu(dp->ldp_hash_start) >> 32; + *end = le64_to_cpu(dp->ldp_hash_end) >> 32; + *hash = *hash >> 32; + } else { + *start = le64_to_cpu(dp->ldp_hash_start); + *end = le64_to_cpu(dp->ldp_hash_end); + } + if (unlikely(*start == 1 && *hash == 0)) + *hash = *start; + else + LASSERTF(*start <= *hash, "start = "LPX64 + ",end = "LPX64",hash = "LPX64"\n", + *start, *end, *hash); + CDEBUG(D_VFSTRACE, "page%lu [%llu %llu], hash"LPU64"\n", + offset, *start, *end, *hash); + if (*hash > *end) { + kunmap(page); + mdc_release_page(page, 0); + page = NULL; + } else if (*end != *start && *hash == *end) { + /* + * upon hash collision, remove this page, + * otherwise put page reference, and + * ll_get_dir_page() will issue RPC to fetch + * the page we want. + */ + kunmap(page); + mdc_release_page(page, + le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); + page = NULL; + } + } else { + page_cache_release(page); + page = ERR_PTR(-EIO); + } + } else { + spin_unlock_irq(&mapping->tree_lock); + page = NULL; + } + return page; +} + +/* + * Adjust a set of pages, each page containing an array of lu_dirpages, + * so that each page can be used as a single logical lu_dirpage. + * + * A lu_dirpage is laid out as follows, where s = ldp_hash_start, + * e = ldp_hash_end, f = ldp_flags, p = padding, and each "ent" is a + * struct lu_dirent. It has size up to LU_PAGE_SIZE. The ldp_hash_end + * value is used as a cookie to request the next lu_dirpage in a + * directory listing that spans multiple pages (two in this example): + * ________ + * | | + * .|--------v------- -----. + * |s|e|f|p|ent|ent| ... |ent| + * '--|-------------- -----' Each CFS_PAGE contains a single + * '------. lu_dirpage. + * .---------v------- -----. + * |s|e|f|p|ent| 0 | ... | 0 | + * '----------------- -----' + * + * However, on hosts where the native VM page size (PAGE_CACHE_SIZE) is + * larger than LU_PAGE_SIZE, a single host page may contain multiple + * lu_dirpages. After reading the lu_dirpages from the MDS, the + * ldp_hash_end of the first lu_dirpage refers to the one immediately + * after it in the same CFS_PAGE (arrows simplified for brevity, but + * in general e0==s1, e1==s2, etc.): + * + * .-------------------- -----. + * |s0|e0|f0|p|ent|ent| ... |ent| + * |---v---------------- -----| + * |s1|e1|f1|p|ent|ent| ... |ent| + * |---v---------------- -----| Here, each CFS_PAGE contains + * ... multiple lu_dirpages. + * |---v---------------- -----| + * |s'|e'|f'|p|ent|ent| ... |ent| + * '---|---------------- -----' + * v + * .----------------------------. + * | next CFS_PAGE | + * + * This structure is transformed into a single logical lu_dirpage as follows: + * + * - Replace e0 with e' so the request for the next lu_dirpage gets the page + * labeled 'next CFS_PAGE'. + * + * - Copy the LDF_COLLIDE flag from f' to f0 to correctly reflect whether + * a hash collision with the next page exists. + * + * - Adjust the lde_reclen of the ending entry of each lu_dirpage to span + * to the first entry of the next lu_dirpage. + */ +#if PAGE_CACHE_SIZE > LU_PAGE_SIZE +static void mdc_adjust_dirpages(struct page **pages, int cfs_pgs, int lu_pgs) +{ + int i; + + for (i = 0; i < cfs_pgs; i++) { + struct lu_dirpage *dp = kmap(pages[i]); + struct lu_dirpage *first = dp; + struct lu_dirent *end_dirent = NULL; + struct lu_dirent *ent; + __u64 hash_end = le64_to_cpu(dp->ldp_hash_end); + __u32 flags = le32_to_cpu(dp->ldp_flags); + + while (--lu_pgs > 0) { + ent = lu_dirent_start(dp); + for (end_dirent = ent; ent != NULL; + end_dirent = ent, ent = lu_dirent_next(ent)); + + /* Advance dp to next lu_dirpage. */ + dp = (struct lu_dirpage *)((char *)dp + LU_PAGE_SIZE); + + /* Check if we've reached the end of the CFS_PAGE. */ + if (!((unsigned long)dp & ~CFS_PAGE_MASK)) + break; + + /* Save the hash and flags of this lu_dirpage. */ + hash_end = le64_to_cpu(dp->ldp_hash_end); + flags = le32_to_cpu(dp->ldp_flags); + + /* Check if lu_dirpage contains no entries. */ + if (end_dirent == NULL) + break; + + /* Enlarge the end entry lde_reclen from 0 to + * first entry of next lu_dirpage. */ + LASSERT(le16_to_cpu(end_dirent->lde_reclen) == 0); + end_dirent->lde_reclen = + cpu_to_le16((char *)(dp->ldp_entries) - + (char *)end_dirent); + } + + first->ldp_hash_end = hash_end; + first->ldp_flags &= ~cpu_to_le32(LDF_COLLIDE); + first->ldp_flags |= flags & cpu_to_le32(LDF_COLLIDE); + + kunmap(pages[i]); + } + LASSERTF(lu_pgs == 0, "left = %d", lu_pgs); +} +#else +#define mdc_adjust_dirpages(pages, cfs_pgs, lu_pgs) do {} while (0) +#endif /* PAGE_CACHE_SIZE > LU_PAGE_SIZE */ + +/* parameters for readdir page */ +struct readpage_param { + struct md_op_data *rp_mod; + __u64 rp_off; + int rp_hash64; + struct obd_export *rp_exp; + struct md_callback *rp_cb; +}; + +/** + * Read pages from server. + * + * Page in MDS_READPAGE RPC is packed in LU_PAGE_SIZE, and each page contains + * a header lu_dirpage which describes the start/end hash, and whether this + * page is empty (contains no dir entry) or hash collide with next page. + * After client receives reply, several pages will be integrated into dir page + * in CFS_PAGE_SIZE (if CFS_PAGE_SIZE greater than LU_PAGE_SIZE), and the + * lu_dirpage for this integrated page will be adjusted. + **/ +static int mdc_read_page_remote(void *data, struct page *page0) +{ + struct readpage_param *rp = data; + struct page **page_pool; + struct page *page; + struct lu_dirpage *dp; + int rd_pgs = 0; /* number of pages read actually */ + int npages; + struct md_op_data *op_data = rp->rp_mod; + struct ptlrpc_request *req; + int max_pages = op_data->op_max_pages; + struct inode *inode; + struct lu_fid *fid; + int i; + int rc; + ENTRY; + + LASSERT(max_pages > 0 && max_pages <= PTLRPC_MAX_BRW_PAGES); + if (op_data->op_mea1 != NULL) { + __u32 index = op_data->op_stripe_offset; + + inode = op_data->op_mea1->lsm_md_oinfo[index].lmo_root; + fid = &op_data->op_mea1->lsm_md_oinfo[index].lmo_fid; + } else { + inode = op_data->op_data; + fid = &op_data->op_fid1; + } + LASSERT(inode != NULL); + + OBD_ALLOC(page_pool, sizeof(page_pool[0]) * max_pages); + if (page_pool != NULL) { + page_pool[0] = page0; + } else { + page_pool = &page0; + max_pages = 1; + } + + for (npages = 1; npages < max_pages; npages++) { + page = page_cache_alloc_cold(inode->i_mapping); + if (page == NULL) + break; + page_pool[npages] = page; + } + + rc = mdc_getpage(rp->rp_exp, fid, rp->rp_off, op_data->op_capa1, + page_pool, npages, &req); + if (rc == 0) { + int lu_pgs; + + rd_pgs = (req->rq_bulk->bd_nob_transferred + + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; + lu_pgs = req->rq_bulk->bd_nob_transferred >> + LU_PAGE_SHIFT; + LASSERT(!(req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK)); + + CDEBUG(D_INODE, "read %d(%d)/%d pages\n", rd_pgs, lu_pgs, + op_data->op_npages); + + mdc_adjust_dirpages(page_pool, rd_pgs, lu_pgs); + + SetPageUptodate(page0); + } + + unlock_page(page0); + ptlrpc_req_finished(req); + CDEBUG(D_CACHE, "read %d/%d pages\n", rd_pgs, npages); + for (i = 1; i < npages; i++) { + unsigned long offset; + __u64 hash; + int ret; + + page = page_pool[i]; + + if (rc < 0 || i >= rd_pgs) { + page_cache_release(page); + continue; + } + + SetPageUptodate(page); + + dp = kmap(page); + hash = le64_to_cpu(dp->ldp_hash_start); + kunmap(page); + + offset = hash_x_index(hash, rp->rp_hash64); + + prefetchw(&page->flags); + ret = add_to_page_cache_lru(page, inode->i_mapping, offset, + GFP_KERNEL); + if (ret == 0) + unlock_page(page); + else + CDEBUG(D_VFSTRACE, "page %lu add to page cache failed:" + " rc = %d\n", offset, ret); + page_cache_release(page); + } + + if (page_pool != &page0) + OBD_FREE(page_pool, sizeof(page_pool[0]) * max_pages); + + RETURN(rc); +} + +/** + * Read dir page from cache first, if it can not find it, read it from + * server and add into the cache. + */ +static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, struct page **ppage) +{ + struct lookup_intent it = { .it_op = IT_READDIR }; + struct page *page; + struct inode *dir = NULL; + struct address_space *mapping; + struct lu_dirpage *dp; + __u64 start = 0; + __u64 end = 0; + struct lustre_handle lockh; + struct ptlrpc_request *enq_req = NULL; + struct readpage_param rp_param; + int rc; + + ENTRY; + + *ppage = NULL; + + if (op_data->op_mea1 != NULL) { + __u32 index = op_data->op_stripe_offset; + + dir = op_data->op_mea1->lsm_md_oinfo[index].lmo_root; + } else { + dir = op_data->op_data; + } + LASSERT(dir != NULL); + + mapping = dir->i_mapping; + + rc = mdc_intent_lock(exp, op_data, &it, &enq_req, + cb_op->md_blocking_ast, 0); + if (enq_req != NULL) + ptlrpc_req_finished(enq_req); + + if (rc < 0) { + CERROR("%s: "DFID" lock enqueue fails: rc = %d\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), rc); + RETURN(rc); + } + + rc = 0; + mdc_set_lock_data(exp, &it.d.lustre.it_lock_handle, dir, NULL); + + rp_param.rp_off = op_data->op_hash_offset; + rp_param.rp_hash64 = op_data->op_cli_flags & CLI_HASH64; + page = mdc_page_locate(mapping, &rp_param.rp_off, &start, &end, + rp_param.rp_hash64); + if (IS_ERR(page)) { + CERROR("%s: dir page locate: "DFID" at "LPU64": rc %ld\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + rp_param.rp_off, PTR_ERR(page)); + GOTO(out_unlock, rc = PTR_ERR(page)); + } else if (page != NULL) { + /* + * XXX nikita: not entirely correct handling of a corner case: + * suppose hash chain of entries with hash value HASH crosses + * border between pages P0 and P1. First both P0 and P1 are + * cached, seekdir() is called for some entry from the P0 part + * of the chain. Later P0 goes out of cache. telldir(HASH) + * happens and finds P1, as it starts with matching hash + * value. Remaining entries from P0 part of the chain are + * skipped. (Is that really a bug?) + * + * Possible solutions: 0. don't cache P1 is such case, handle + * it as an "overflow" page. 1. invalidate all pages at + * once. 2. use HASH|1 as an index for P1. + */ + GOTO(hash_collision, page); + } + + rp_param.rp_exp = exp; + rp_param.rp_mod = op_data; + page = read_cache_page(mapping, + hash_x_index(rp_param.rp_off, + rp_param.rp_hash64), + mdc_read_page_remote, &rp_param); + if (IS_ERR(page)) { + CERROR("%s: read cache page: "DFID" at "LPU64": rc %ld\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + rp_param.rp_off, PTR_ERR(page)); + GOTO(out_unlock, rc = PTR_ERR(page)); + } + + wait_on_page_locked(page); + (void)kmap(page); + if (!PageUptodate(page)) { + CERROR("%s: page not updated: "DFID" at "LPU64": rc %d\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + rp_param.rp_off, -5); + goto fail; + } + if (!PageChecked(page)) + SetPageChecked(page); + if (PageError(page)) { + CERROR("%s: page error: "DFID" at "LPU64": rc %d\n", + exp->exp_obd->obd_name, PFID(&op_data->op_fid1), + rp_param.rp_off, -5); + goto fail; + } + +hash_collision: + dp = page_address(page); + if (BITS_PER_LONG == 32 && rp_param.rp_hash64) { + start = le64_to_cpu(dp->ldp_hash_start) >> 32; + end = le64_to_cpu(dp->ldp_hash_end) >> 32; + rp_param.rp_off = op_data->op_hash_offset >> 32; + } else { + start = le64_to_cpu(dp->ldp_hash_start); + end = le64_to_cpu(dp->ldp_hash_end); + rp_param.rp_off = op_data->op_hash_offset; + } + if (end == start) { + LASSERT(start == rp_param.rp_off); + CWARN("Page-wide hash collision: %#lx\n", (unsigned long)end); +#if BITS_PER_LONG == 32 + CWARN("Real page-wide hash collision at ["LPU64" "LPU64"] with " + "hash "LPU64"\n", le64_to_cpu(dp->ldp_hash_start), + le64_to_cpu(dp->ldp_hash_end), op_data->op_hash_offset); +#endif + + /* + * Fetch whole overflow chain... + * + * XXX not yet. + */ + goto fail; + } + *ppage = page; +out_unlock: + lockh.cookie = it.d.lustre.it_lock_handle; + ldlm_lock_decref(&lockh, it.d.lustre.it_lock_mode); + it.d.lustre.it_lock_handle = 0; + return rc; +fail: + kunmap(page); + mdc_release_page(page, 1); + rc = -EIO; + goto out_unlock; } +/** + * Read one directory entry from the cache. + */ +int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, struct lu_dirent **entp, + struct page **ppage) +{ + struct page *page = NULL; + struct lu_dirpage *dp; + struct lu_dirent *ent; + int rc = 0; + ENTRY; + + CDEBUG(D_INFO, DFID "offset = "LPU64"\n", PFID(&op_data->op_fid1), + op_data->op_hash_offset); + + *ppage = NULL; + *entp = NULL; + + if (op_data->op_hash_offset == MDS_DIR_END_OFF) + RETURN(0); + + rc = mdc_read_page(exp, op_data, cb_op, &page); + if (rc != 0) + RETURN(rc); + + dp = page_address(page); + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) { + /* Skip dummy entry */ + if (le16_to_cpu(ent->lde_namelen) == 0) + continue; + + if (le64_to_cpu(ent->lde_hash) > op_data->op_hash_offset) + break; + } + + /* If it can not find entry in current page, try next page. */ + if (ent == NULL) { + __u64 orig_offset = op_data->op_hash_offset; + + if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) { + mdc_release_page(page, 0); + RETURN(0); + } + + op_data->op_hash_offset = le64_to_cpu(dp->ldp_hash_end); + mdc_release_page(page, + le32_to_cpu(dp->ldp_flags) & LDF_COLLIDE); + rc = mdc_read_page(exp, op_data, cb_op, &page); + if (rc != 0) + RETURN(rc); + + if (page != NULL) { + dp = page_address(page); + ent = lu_dirent_start(dp); + } + + op_data->op_hash_offset = orig_offset; + } + + *ppage = page; + *entp = ent; + + RETURN(rc); +} + +#else /* __KERNEL__ */ + +static struct page +*mdc_read_page_remote(struct obd_export *exp, const struct lmv_oinfo *lmo, + const __u64 hash, struct obd_capa *oc) +{ + struct ptlrpc_request *req = NULL; + struct page *page; + int rc; + + OBD_PAGE_ALLOC(page, 0); + if (page == NULL) + return ERR_PTR(-ENOMEM); + + rc = mdc_getpage(exp, &lmo->lmo_fid, hash, oc, &page, 1, &req); + if (req != NULL) + ptlrpc_req_finished(req); + + if (unlikely(rc)) { + OBD_PAGE_FREE(page); + return ERR_PTR(rc); + } + return page; +} + + +static int mdc_read_page(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, + struct page **ppage) +{ + struct page *page; + struct lmv_oinfo *lmo; + int rc = 0; + + /* No local cache for liblustre, always read entry remotely */ + lmo = &op_data->op_mea1->lsm_md_oinfo[op_data->op_stripe_offset]; + page = mdc_read_page_remote(exp, lmo, op_data->op_hash_offset, + op_data->op_capa1); + if (IS_ERR(page)) + return PTR_ERR(page); + + *ppage = page; + + return rc; +} + +int mdc_read_entry(struct obd_export *exp, struct md_op_data *op_data, + struct md_callback *cb_op, struct lu_dirent **entp, + struct page **ppage) +{ + struct page *page = NULL; + struct lu_dirpage *dp; + struct lu_dirent *ent; + int rc; + ENTRY; + + rc = mdc_read_page(exp, op_data, cb_op, &page); + if (rc != 0) + RETURN(rc); + + dp = page_address(page); + if (le64_to_cpu(dp->ldp_hash_end) < op_data->op_hash_offset) + GOTO(out, *entp = NULL); + + for (ent = lu_dirent_start(dp); ent != NULL; + ent = lu_dirent_next(ent)) + if (le64_to_cpu(ent->lde_hash) >= op_data->op_hash_offset) + break; + *entp = ent; +out: + + OBD_PAGE_FREE(page); + RETURN(rc); +} + +#endif + static int mdc_statfs(const struct lu_env *env, struct obd_export *exp, struct obd_statfs *osfs, __u64 max_age, __u32 flags) @@ -1111,10 +1798,10 @@ static int mdc_statfs(const struct lu_env *env, * Since the request might also come from lprocfs, so we need * sync this with client_disconnect_export Bug15684 */ - cfs_down_read(&obd->u.cli.cl_sem); + down_read(&obd->u.cli.cl_sem); if (obd->u.cli.cl_import) imp = class_import_get(obd->u.cli.cl_import); - cfs_up_read(&obd->u.cli.cl_sem); + up_read(&obd->u.cli.cl_sem); if (!imp) RETURN(-ENODEV); @@ -1180,9 +1867,9 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf) /* Val is struct getinfo_fid2path result plus path */ vallen = sizeof(*gf) + gf->gf_pathlen; - rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL); - if (rc) - GOTO(out, rc); + rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL); + if (rc != 0 && rc != -EREMOTE) + GOTO(out, rc); if (vallen <= sizeof(*gf)) GOTO(out, rc = -EPROTO); @@ -1197,52 +1884,328 @@ out: return rc; } +static int mdc_ioc_hsm_progress(struct obd_export *exp, + struct hsm_progress_kernel *hpk) +{ + struct obd_import *imp = class_exp2cliimp(exp); + struct hsm_progress_kernel *req_hpk; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_PROGRESS, + LUSTRE_MDS_VERSION, MDS_HSM_PROGRESS); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, -1, 0); + + /* Copy hsm_progress struct */ + req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS); + if (req_hpk == NULL) + GOTO(out, rc = -EPROTO); + + *req_hpk = *hpk; + req_hpk->hpk_errval = lustre_errno_hton(hpk->hpk_errval); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives) +{ + __u32 *archive_mask; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_REGISTER, + LUSTRE_MDS_VERSION, + MDS_HSM_CT_REGISTER); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, -1, 0); + + /* Copy hsm_progress struct */ + archive_mask = req_capsule_client_get(&req->rq_pill, + &RMF_MDS_HSM_ARCHIVE); + if (archive_mask == NULL) + GOTO(out, rc = -EPROTO); + + *archive_mask = archives; + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_current_action(struct obd_export *exp, + struct md_op_data *op_data) +{ + struct hsm_current_action *hca = op_data->op_data; + struct hsm_current_action *req_hca; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_HSM_ACTION); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + if (rc) + GOTO(out, rc); + + req_hca = req_capsule_server_get(&req->rq_pill, + &RMF_MDS_HSM_CURRENT_ACTION); + if (req_hca == NULL) + GOTO(out, rc = -EPROTO); + + *hca = *req_hca; + + EXIT; +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp) +{ + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_UNREGISTER, + LUSTRE_MDS_VERSION, + MDS_HSM_CT_UNREGISTER); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, -1, 0); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_state_get(struct obd_export *exp, + struct md_op_data *op_data) +{ + struct hsm_user_state *hus = op_data->op_data; + struct hsm_user_state *req_hus; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_HSM_STATE_GET); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + if (rc) + GOTO(out, rc); + + req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE); + if (req_hus == NULL) + GOTO(out, rc = -EPROTO); + + *hus = *req_hus; + + EXIT; +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_state_set(struct obd_export *exp, + struct md_op_data *op_data) +{ + struct hsm_state_set *hss = op_data->op_data; + struct hsm_state_set *req_hss; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_HSM_STATE_SET); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0); + + /* Copy states */ + req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET); + if (req_hss == NULL) + GOTO(out, rc = -EPROTO); + *req_hss = *hss; + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); + + EXIT; +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_request(struct obd_export *exp, + struct hsm_user_request *hur) +{ + struct obd_import *imp = class_exp2cliimp(exp); + struct ptlrpc_request *req; + struct hsm_request *req_hr; + struct hsm_user_item *req_hui; + char *req_opaque; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT, + hur->hur_request.hr_itemcount + * sizeof(struct hsm_user_item)); + req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT, + hur->hur_request.hr_data_len); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, -1, 0); + + /* Copy hsm_request struct */ + req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST); + if (req_hr == NULL) + GOTO(out, rc = -EPROTO); + *req_hr = hur->hur_request; + + /* Copy hsm_user_item structs */ + req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM); + if (req_hui == NULL) + GOTO(out, rc = -EPROTO); + memcpy(req_hui, hur->hur_user_item, + hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item)); + + /* Copy opaque field */ + req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA); + if (req_opaque == NULL) + GOTO(out, rc = -EPROTO); + memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); + +out: + ptlrpc_req_finished(req); + return rc; +} + static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags) { - struct kuc_hdr *lh = (struct kuc_hdr *)buf; + struct kuc_hdr *lh = (struct kuc_hdr *)buf; - LASSERT(len <= CR_MAXSIZE); + LASSERT(len <= KUC_CHANGELOG_MSG_MAXSIZE); - lh->kuc_magic = KUC_MAGIC; - lh->kuc_transport = KUC_TRANSPORT_CHANGELOG; - lh->kuc_flags = flags; - lh->kuc_msgtype = CL_RECORD; - lh->kuc_msglen = len; - return lh; + lh->kuc_magic = KUC_MAGIC; + lh->kuc_transport = KUC_TRANSPORT_CHANGELOG; + lh->kuc_flags = flags; + lh->kuc_msgtype = CL_RECORD; + lh->kuc_msglen = len; + return lh; } #define D_CHANGELOG 0 struct changelog_show { - __u64 cs_startrec; - __u32 cs_flags; - cfs_file_t *cs_fp; - char *cs_buf; - struct obd_device *cs_obd; + __u64 cs_startrec; + __u32 cs_flags; + struct file *cs_fp; + char *cs_buf; + struct obd_device *cs_obd; }; -static int changelog_show_cb(const struct lu_env *env, struct llog_handle *llh, +static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh, struct llog_rec_hdr *hdr, void *data) { - struct changelog_show *cs = data; - struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; - struct kuc_hdr *lh; - int len, rc; - ENTRY; - - if ((rec->cr_hdr.lrh_type != CHANGELOG_REC) || - (rec->cr.cr_type >= CL_LAST)) { - CERROR("Not a changelog rec %d/%d\n", rec->cr_hdr.lrh_type, - rec->cr.cr_type); - RETURN(-EINVAL); - } + struct changelog_show *cs = data; + struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; + struct kuc_hdr *lh; + int len, rc; + ENTRY; + + if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { + rc = -EINVAL; + CERROR("%s: not a changelog rec %x/%d: rc = %d\n", + cs->cs_obd->obd_name, rec->cr_hdr.lrh_type, + rec->cr.cr_type, rc); + RETURN(rc); + } - if (rec->cr.cr_index < cs->cs_startrec) { - /* Skip entries earlier than what we are interested in */ - CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n", - rec->cr.cr_index, cs->cs_startrec); - RETURN(0); - } + if (rec->cr.cr_index < cs->cs_startrec) { + /* Skip entries earlier than what we are interested in */ + CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n", + rec->cr.cr_index, cs->cs_startrec); + RETURN(0); + } CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID " %.*s\n", rec->cr.cr_index, rec->cr.cr_type, @@ -1265,25 +2228,18 @@ static int changelog_show_cb(const struct lu_env *env, struct llog_handle *llh, static int mdc_changelog_send_thread(void *csdata) { - struct changelog_show *cs = csdata; - struct llog_ctxt *ctxt = NULL; - struct llog_handle *llh = NULL; - struct kuc_hdr *kuch; - int rc; + struct changelog_show *cs = csdata; + struct llog_ctxt *ctxt = NULL; + struct llog_handle *llh = NULL; + struct kuc_hdr *kuch; + int rc; - CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n", - cs->cs_fp, cs->cs_startrec); + CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n", + cs->cs_fp, cs->cs_startrec); - /* - * It's important to daemonize here to close unused FDs. - * The write fd from pipe is already opened by the caller, - * so it's fine to clear all files here - */ - cfs_daemonize("mdc_clg_send_thread"); - - OBD_ALLOC(cs->cs_buf, CR_MAXSIZE); - if (cs->cs_buf == NULL) - GOTO(out, rc = -ENOMEM); + OBD_ALLOC(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); + if (cs->cs_buf == NULL) + GOTO(out, rc = -ENOMEM); /* Set up the remote catalog handle */ ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT); @@ -1302,7 +2258,7 @@ static int mdc_changelog_send_thread(void *csdata) GOTO(out, rc); } - rc = llog_cat_process(NULL, llh, changelog_show_cb, cs, 0, 0); + rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0); /* Send EOF no matter what our result */ if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), @@ -1312,47 +2268,53 @@ static int mdc_changelog_send_thread(void *csdata) } out: - cfs_put_file(cs->cs_fp); - if (llh) + fput(cs->cs_fp); + if (llh) llog_cat_close(NULL, llh); if (ctxt) llog_ctxt_put(ctxt); - if (cs->cs_buf) - OBD_FREE(cs->cs_buf, CR_MAXSIZE); - OBD_FREE_PTR(cs); - /* detach from parent process so we get cleaned up */ - cfs_daemonize("cl_send"); - return rc; + if (cs->cs_buf) + OBD_FREE(cs->cs_buf, KUC_CHANGELOG_MSG_MAXSIZE); + OBD_FREE_PTR(cs); + return rc; } static int mdc_ioc_changelog_send(struct obd_device *obd, struct ioc_changelog *icc) { - struct changelog_show *cs; - int rc; + struct changelog_show *cs; + struct task_struct *task; + int rc; /* Freed in mdc_changelog_send_thread */ OBD_ALLOC_PTR(cs); if (!cs) return -ENOMEM; - cs->cs_obd = obd; - cs->cs_startrec = icc->icc_recno; - /* matching cfs_put_file in mdc_changelog_send_thread */ - cs->cs_fp = cfs_get_fd(icc->icc_id); - cs->cs_flags = icc->icc_flags; - - /* New thread because we should return to user app before - writing into our pipe */ - rc = cfs_create_thread(mdc_changelog_send_thread, cs, CFS_DAEMON_FLAGS); - if (rc >= 0) { - CDEBUG(D_CHANGELOG, "start changelog thread: %d\n", rc); - return 0; - } + cs->cs_obd = obd; + cs->cs_startrec = icc->icc_recno; + /* matching fput in mdc_changelog_send_thread */ + cs->cs_fp = fget(icc->icc_id); + cs->cs_flags = icc->icc_flags; + + /* + * New thread because we should return to user app before + * writing into our pipe + */ + task = kthread_run(mdc_changelog_send_thread, cs, + "mdc_clg_send_thread"); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + CERROR("%s: cannot start changelog thread: rc = %d\n", + obd->obd_name, rc); + OBD_FREE_PTR(cs); + } else { + rc = 0; + CDEBUG(D_CHANGELOG, "%s: started changelog thread\n", + obd->obd_name); + } - CERROR("Failed to start changelog thread: %d\n", rc); - OBD_FREE_PTR(cs); - return rc; + return rc; } static int mdc_ioc_hsm_ct_start(struct obd_export *exp, @@ -1442,24 +2404,79 @@ static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp, RETURN(rc); } +static int mdc_ioc_swap_layouts(struct obd_export *exp, + struct md_op_data *op_data) +{ + struct list_head cancels = LIST_HEAD_INIT(cancels); + struct ptlrpc_request *req; + int rc, count; + struct mdc_swap_layouts *msl, *payload; + ENTRY; + + msl = op_data->op_data; + + /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the + * first thing it will do is to cancel the 2 layout + * locks hold by this client. + * So the client must cancel its layout locks on the 2 fids + * with the request RPC to avoid extra RPC round trips + */ + count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels, + LCK_EX, MDS_INODELOCK_LAYOUT | + MDS_INODELOCK_XATTR); + count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels, + LCK_EX, MDS_INODELOCK_LAYOUT | + MDS_INODELOCK_XATTR); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_SWAP_LAYOUTS); + if (req == NULL) { + ldlm_lock_list_put(&cancels, l_bl_ast, count); + RETURN(-ENOMEM); + } + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2); + + rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_swap_layouts_pack(req, op_data); + + payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS); + LASSERT(payload); + + *payload = *msl; + + ptlrpc_request_set_replen(req); + + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); + EXIT; + +out: + ptlrpc_req_finished(req); + return rc; +} + static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { struct obd_device *obd = exp->exp_obd; struct obd_ioctl_data *data = karg; struct obd_import *imp = obd->u.cli.cl_import; - struct llog_ctxt *ctxt; int rc; ENTRY; - if (!cfs_try_module_get(THIS_MODULE)) { - CERROR("Can't get module. Is it alive?"); - return -EINVAL; - } + if (!try_module_get(THIS_MODULE)) { + CERROR("Can't get module. Is it alive?"); + return -EINVAL; + } switch (cmd) { - case LL_IOC_HSM_CT_START: - rc = mdc_ioc_hsm_ct_start(exp, karg); - GOTO(out, rc); case OBD_IOC_CHANGELOG_SEND: rc = mdc_ioc_changelog_send(obd, karg); GOTO(out, rc); @@ -1472,10 +2489,30 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, NULL); GOTO(out, rc); } - case OBD_IOC_FID2PATH: { - rc = mdc_ioc_fid2path(exp, karg); - GOTO(out, rc); - } + case OBD_IOC_FID2PATH: + rc = mdc_ioc_fid2path(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_CT_START: + rc = mdc_ioc_hsm_ct_start(exp, karg); + /* ignore if it was already registered on this MDS. */ + if (rc == -EEXIST) + rc = 0; + GOTO(out, rc); + case LL_IOC_HSM_PROGRESS: + rc = mdc_ioc_hsm_progress(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_STATE_GET: + rc = mdc_ioc_hsm_state_get(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_STATE_SET: + rc = mdc_ioc_hsm_state_set(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_ACTION: + rc = mdc_ioc_hsm_current_action(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_REQUEST: + rc = mdc_ioc_hsm_request(exp, karg); + GOTO(out, rc); case OBD_IOC_CLIENT_RECOVER: rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0); if (rc < 0) @@ -1484,21 +2521,6 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case IOC_OSC_SET_ACTIVE: rc = ptlrpc_set_import_active(imp, data->ioc_offset); GOTO(out, rc); - case OBD_IOC_PARSE: { - ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT); - rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL); - llog_ctxt_put(ctxt); - GOTO(out, rc); - } -#ifdef __KERNEL__ - case OBD_IOC_LLOG_INFO: - case OBD_IOC_LLOG_PRINT: { - ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); - rc = llog_ioctl(ctxt, cmd, data); - llog_ctxt_put(ctxt); - GOTO(out, rc); - } -#endif case OBD_IOC_POLL_QUOTACHECK: rc = mdc_quota_poll_check(exp, (struct if_quotacheck *)karg); GOTO(out, rc); @@ -1519,57 +2541,60 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, GOTO(out, rc = -ENODEV); /* copy UUID */ - if (cfs_copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd), - min((int) data->ioc_plen2, - (int) sizeof(struct obd_uuid)))) - GOTO(out, rc = -EFAULT); - - rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf, - cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), - 0); - if (rc != 0) - GOTO(out, rc); - - if (cfs_copy_to_user(data->ioc_pbuf1, &stat_buf, + if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd), + min((int)data->ioc_plen2, + (int)sizeof(struct obd_uuid)))) + GOTO(out, rc = -EFAULT); + + rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf, + cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), + 0); + if (rc != 0) + GOTO(out, rc); + + if (copy_to_user(data->ioc_pbuf1, &stat_buf, min((int) data->ioc_plen1, (int) sizeof(stat_buf)))) GOTO(out, rc = -EFAULT); GOTO(out, rc = 0); } - case OBD_IOC_QUOTACTL: { - struct if_quotactl *qctl = karg; - struct obd_quotactl *oqctl; - - OBD_ALLOC_PTR(oqctl); - if (!oqctl) - RETURN(-ENOMEM); - - QCTL_COPY(oqctl, qctl); - rc = obd_quotactl(exp, oqctl); - if (rc == 0) { - QCTL_COPY(qctl, oqctl); - qctl->qc_valid = QC_MDTIDX; - qctl->obd_uuid = obd->u.cli.cl_target_uuid; - } - OBD_FREE_PTR(oqctl); - break; - } - case LL_IOC_GET_CONNECT_FLAGS: { - if (cfs_copy_to_user(uarg, &exp->exp_connect_flags, - sizeof(__u64))) - GOTO(out, rc = -EFAULT); - else - GOTO(out, rc = 0); - } - default: - CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd); - GOTO(out, rc = -ENOTTY); - } + case OBD_IOC_QUOTACTL: { + struct if_quotactl *qctl = karg; + struct obd_quotactl *oqctl; + + OBD_ALLOC_PTR(oqctl); + if (oqctl == NULL) + GOTO(out, rc = -ENOMEM); + + QCTL_COPY(oqctl, qctl); + rc = obd_quotactl(exp, oqctl); + if (rc == 0) { + QCTL_COPY(qctl, oqctl); + qctl->qc_valid = QC_MDTIDX; + qctl->obd_uuid = obd->u.cli.cl_target_uuid; + } + + OBD_FREE_PTR(oqctl); + GOTO(out, rc); + } + case LL_IOC_GET_CONNECT_FLAGS: + if (copy_to_user(uarg, exp_connect_flags_ptr(exp), + sizeof(*exp_connect_flags_ptr(exp)))) + GOTO(out, rc = -EFAULT); + + GOTO(out, rc = 0); + case LL_IOC_LOV_SWAP_LAYOUTS: + rc = mdc_ioc_swap_layouts(exp, karg); + GOTO(out, rc); + default: + CERROR("unrecognised ioctl: cmd = %#x\n", cmd); + GOTO(out, rc = -ENOTTY); + } out: - cfs_module_put(THIS_MODULE); + module_put(THIS_MODULE); - return rc; + return rc; } int mdc_get_info_rpc(struct obd_export *exp, @@ -1606,45 +2631,46 @@ int mdc_get_info_rpc(struct obd_export *exp, RCL_SERVER, vallen); ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc == 0) { - tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL); - memcpy(val, tmp, vallen); - if (ptlrpc_rep_need_swab(req)) { - if (KEY_IS(KEY_FID2PATH)) { - lustre_swab_fid2path(val); - } - } - } - ptlrpc_req_finished(req); + rc = ptlrpc_queue_wait(req); + /* -EREMOTE means the get_info result is partial, and it needs to + * continue on another MDT, see fid2path part in lmv_iocontrol */ + if (rc == 0 || rc == -EREMOTE) { + tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL); + memcpy(val, tmp, vallen); + if (ptlrpc_rep_need_swab(req)) { + if (KEY_IS(KEY_FID2PATH)) + lustre_swab_fid2path(val); + } + } + ptlrpc_req_finished(req); - RETURN(rc); + RETURN(rc); } static void lustre_swab_hai(struct hsm_action_item *h) { - __swab32s(&h->hai_len); - __swab32s(&h->hai_action); - lustre_swab_lu_fid(&h->hai_fid); - __swab64s(&h->hai_cookie); - __swab64s(&h->hai_extent.offset); - __swab64s(&h->hai_extent.length); - __swab64s(&h->hai_gid); + __swab32s(&h->hai_len); + __swab32s(&h->hai_action); + lustre_swab_lu_fid(&h->hai_fid); + lustre_swab_lu_fid(&h->hai_dfid); + __swab64s(&h->hai_cookie); + __swab64s(&h->hai_extent.offset); + __swab64s(&h->hai_extent.length); + __swab64s(&h->hai_gid); } static void lustre_swab_hal(struct hsm_action_list *h) { - struct hsm_action_item *hai; - int i; - - __swab32s(&h->hal_version); - __swab32s(&h->hal_count); - __swab32s(&h->hal_archive_num); - hai = hai_zero(h); - for (i = 0; i < h->hal_count; i++) { - lustre_swab_hai(hai); - hai = hai_next(hai); - } + struct hsm_action_item *hai; + int i; + + __swab32s(&h->hal_version); + __swab32s(&h->hal_count); + __swab32s(&h->hal_archive_id); + __swab64s(&h->hal_flags); + hai = hai_first(h); + for (i = 0; i < h->hal_count; i++, hai = hai_next(hai)) + lustre_swab_hai(hai); } static void lustre_swab_kuch(struct kuc_hdr *l) @@ -1656,33 +2682,28 @@ static void lustre_swab_kuch(struct kuc_hdr *l) } static int mdc_ioc_hsm_ct_start(struct obd_export *exp, - struct lustre_kernelcomm *lk) + struct lustre_kernelcomm *lk) { - int rc = 0; + struct obd_import *imp = class_exp2cliimp(exp); + __u32 archive = lk->lk_data; + int rc = 0; - if (lk->lk_group != KUC_GRP_HSM) { - CERROR("Bad copytool group %d\n", lk->lk_group); - return -EINVAL; - } + if (lk->lk_group != KUC_GRP_HSM) { + CERROR("Bad copytool group %d\n", lk->lk_group); + return -EINVAL; + } - CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd, - lk->lk_uid, lk->lk_group, lk->lk_flags); - - if (lk->lk_flags & LK_FLG_STOP) - rc = libcfs_kkuc_group_rem(lk->lk_uid,lk->lk_group); - else { - cfs_file_t *fp = cfs_get_fd(lk->lk_wfd); - rc = libcfs_kkuc_group_add(fp, lk->lk_uid,lk->lk_group, - lk->lk_data); - if (rc && fp) - cfs_put_file(fp); - } + CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd, + lk->lk_uid, lk->lk_group, lk->lk_flags); - /* lk_data is archive number mask */ - /* TODO: register archive num with mdt so coordinator can choose - correct agent. */ + if (lk->lk_flags & LK_FLG_STOP) { + /* Unregister with the coordinator */ + rc = mdc_ioc_hsm_ct_unregister(imp); + } else { + rc = mdc_ioc_hsm_ct_register(imp, archive); + } - return rc; + return rc; } /** @@ -1692,57 +2713,97 @@ static int mdc_ioc_hsm_ct_start(struct obd_export *exp, */ static int mdc_hsm_copytool_send(int len, void *val) { - struct kuc_hdr *lh = (struct kuc_hdr *)val; - struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1); - int rc; - ENTRY; - - if (len < sizeof(*lh) + sizeof(*hal)) { - CERROR("Short HSM message %d < %d\n", len, - (int) (sizeof(*lh) + sizeof(*hal))); - RETURN(-EPROTO); - } - if (lh->kuc_magic == __swab16(KUC_MAGIC)) { - lustre_swab_kuch(lh); - lustre_swab_hal(hal); - } else if (lh->kuc_magic != KUC_MAGIC) { - CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC); - RETURN(-EPROTO); - } + struct kuc_hdr *lh = (struct kuc_hdr *)val; + struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1); + int rc; + ENTRY; + + if (len < sizeof(*lh) + sizeof(*hal)) { + CERROR("Short HSM message %d < %d\n", len, + (int) (sizeof(*lh) + sizeof(*hal))); + RETURN(-EPROTO); + } + if (lh->kuc_magic == __swab16(KUC_MAGIC)) { + lustre_swab_kuch(lh); + lustre_swab_hal(hal); + } else if (lh->kuc_magic != KUC_MAGIC) { + CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC); + RETURN(-EPROTO); + } - CDEBUG(D_HSM, " Received message mg=%x t=%d m=%d l=%d actions=%d\n", - lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype, - lh->kuc_msglen, hal->hal_count); + CDEBUG(D_HSM, " Received message mg=%x t=%d m=%d l=%d actions=%d " + "on %s\n", + lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype, + lh->kuc_msglen, hal->hal_count, hal->hal_fsname); - /* Broadcast to HSM listeners */ - rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh); + /* Broadcast to HSM listeners */ + rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh); - RETURN(rc); + RETURN(rc); } -int mdc_set_info_async(const struct lu_env *env, - struct obd_export *exp, - obd_count keylen, void *key, - obd_count vallen, void *val, - struct ptlrpc_request_set *set) +/** + * callback function passed to kuc for re-registering each HSM copytool + * running on MDC, after MDT shutdown/recovery. + * @param data copytool registration data + * @param cb_arg callback argument (obd_import) + */ +static int mdc_hsm_ct_reregister(void *data, void *cb_arg) { - struct obd_import *imp = class_exp2cliimp(exp); - int rc = -EINVAL; - ENTRY; + struct kkuc_ct_data *kcd = data; + struct obd_import *imp = (struct obd_import *)cb_arg; + int rc; - if (KEY_IS(KEY_READ_ONLY)) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); + if (kcd == NULL || kcd->kcd_magic != KKUC_CT_DATA_MAGIC) + return -EPROTO; - cfs_spin_lock(&imp->imp_lock); - if (*((int *)val)) { - imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY; - imp->imp_connect_data.ocd_connect_flags |= OBD_CONNECT_RDONLY; - } else { - imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY; - imp->imp_connect_data.ocd_connect_flags &= ~OBD_CONNECT_RDONLY; - } - cfs_spin_unlock(&imp->imp_lock); + if (!obd_uuid_equals(&kcd->kcd_uuid, &imp->imp_obd->obd_uuid)) + return 0; + + CDEBUG(D_HA, "%s: recover copytool registration to MDT (archive=%#x)\n", + imp->imp_obd->obd_name, kcd->kcd_archive); + rc = mdc_ioc_hsm_ct_register(imp, kcd->kcd_archive); + + /* ignore error if the copytool is already registered */ + return (rc == -EEXIST) ? 0 : rc; +} + +/** + * Re-establish all kuc contexts with MDT + * after MDT shutdown/recovery. + */ +static int mdc_kuc_reregister(struct obd_import *imp) +{ + /* re-register HSM agents */ + return libcfs_kkuc_group_foreach(KUC_GRP_HSM, mdc_hsm_ct_reregister, + (void *)imp); +} + +int mdc_set_info_async(const struct lu_env *env, + struct obd_export *exp, + obd_count keylen, void *key, + obd_count vallen, void *val, + struct ptlrpc_request_set *set) +{ + struct obd_import *imp = class_exp2cliimp(exp); + int rc; + ENTRY; + + if (KEY_IS(KEY_READ_ONLY)) { + if (vallen != sizeof(int)) + RETURN(-EINVAL); + + spin_lock(&imp->imp_lock); + if (*((int *)val)) { + imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY; + imp->imp_connect_data.ocd_connect_flags |= + OBD_CONNECT_RDONLY; + } else { + imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY; + imp->imp_connect_data.ocd_connect_flags &= + ~OBD_CONNECT_RDONLY; + } + spin_unlock(&imp->imp_lock); rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION, keylen, key, vallen, val, set); @@ -1756,15 +2817,6 @@ int mdc_set_info_async(const struct lu_env *env, sptlrpc_import_flush_my_ctx(imp); RETURN(0); } - if (KEY_IS(KEY_MDS_CONN)) { - /* mds-mds import */ - cfs_spin_lock(&imp->imp_lock); - imp->imp_server_timeout = 1; - cfs_spin_unlock(&imp->imp_lock); - imp->imp_client->cli_request_portal = MDS_MDS_PORTAL; - CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name); - RETURN(0); - } if (KEY_IS(KEY_CHANGELOG_CLEAR)) { rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION, keylen, key, vallen, val, set); @@ -1775,26 +2827,55 @@ int mdc_set_info_async(const struct lu_env *env, RETURN(rc); } - RETURN(rc); + CERROR("Unknown key %s\n", (char *)key); + RETURN(-EINVAL); } int mdc_get_info(const struct lu_env *env, struct obd_export *exp, - __u32 keylen, void *key, __u32 *vallen, void *val, - struct lov_stripe_md *lsm) + __u32 keylen, void *key, __u32 *vallen, void *val, + struct lov_stripe_md *lsm) { - int rc = -EINVAL; - - if (KEY_IS(KEY_MAX_EASIZE)) { - int mdsize, *max_easize; - - if (*vallen != sizeof(int)) - RETURN(-EINVAL); - mdsize = *(int*)val; - if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize) - exp->exp_obd->u.cli.cl_max_mds_easize = mdsize; - max_easize = val; - *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize; - RETURN(0); + int rc = -EINVAL; + + if (KEY_IS(KEY_MAX_EASIZE)) { + int mdsize, *max_easize; + + if (*vallen != sizeof(int)) + RETURN(-EINVAL); + mdsize = *(int *)val; + if (mdsize > exp->exp_obd->u.cli.cl_max_mds_easize) + exp->exp_obd->u.cli.cl_max_mds_easize = mdsize; + max_easize = val; + *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize; + RETURN(0); + } else if (KEY_IS(KEY_DEFAULT_EASIZE)) { + int *default_easize; + + if (*vallen != sizeof(int)) + RETURN(-EINVAL); + default_easize = val; + *default_easize = exp->exp_obd->u.cli.cl_default_mds_easize; + RETURN(0); + } else if (KEY_IS(KEY_MAX_COOKIESIZE)) { + int mdsize, *max_cookiesize; + + if (*vallen != sizeof(int)) + RETURN(-EINVAL); + mdsize = *(int *)val; + if (mdsize > exp->exp_obd->u.cli.cl_max_mds_cookiesize) + exp->exp_obd->u.cli.cl_max_mds_cookiesize = mdsize; + max_cookiesize = val; + *max_cookiesize = exp->exp_obd->u.cli.cl_max_mds_cookiesize; + RETURN(0); + } else if (KEY_IS(KEY_DEFAULT_COOKIESIZE)) { + int *default_cookiesize; + + if (*vallen != sizeof(int)) + RETURN(-EINVAL); + default_cookiesize = val; + *default_cookiesize = + exp->exp_obd->u.cli.cl_default_mds_cookiesize; + RETURN(0); } else if (KEY_IS(KEY_CONN_DATA)) { struct obd_import *imp = class_exp2cliimp(exp); struct obd_connect_data *data = val; @@ -1901,8 +2982,8 @@ static int mdc_unpin(struct obd_export *exp, struct obd_client_handle *handle, RETURN(rc); } -int mdc_sync(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, struct ptlrpc_request **request) +int mdc_fsync(struct obd_export *exp, const struct lu_fid *fid, + struct obd_capa *oc, struct ptlrpc_request **request) { struct ptlrpc_request *req; int rc; @@ -1967,10 +3048,12 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, break; } - case IMP_EVENT_ACTIVE: { - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); - break; - } + case IMP_EVENT_ACTIVE: + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); + /* redo the kuc registration after reconnecting */ + if (rc == 0) + rc = mdc_kuc_reregister(imp); + break; case IMP_EVENT_OCD: rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); break; @@ -1984,70 +3067,13 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, RETURN(rc); } -static int mdc_fid_init(struct obd_export *exp) -{ - struct client_obd *cli = &exp->exp_obd->u.cli; - char *prefix; - int rc; - ENTRY; - - OBD_ALLOC_PTR(cli->cl_seq); - if (cli->cl_seq == NULL) - RETURN(-ENOMEM); - - OBD_ALLOC(prefix, MAX_OBD_NAME + 5); - if (prefix == NULL) - GOTO(out_free_seq, rc = -ENOMEM); - - snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", - exp->exp_obd->obd_name); - - /* Init client side sequence-manager */ - rc = seq_client_init(cli->cl_seq, exp, - LUSTRE_SEQ_METADATA, - prefix, NULL); - OBD_FREE(prefix, MAX_OBD_NAME + 5); - if (rc) - GOTO(out_free_seq, rc); - - RETURN(rc); -out_free_seq: - OBD_FREE_PTR(cli->cl_seq); - cli->cl_seq = NULL; - return rc; -} - -static int mdc_fid_fini(struct obd_export *exp) -{ - struct client_obd *cli = &exp->exp_obd->u.cli; - ENTRY; - - if (cli->cl_seq != NULL) { - seq_client_fini(cli->cl_seq); - OBD_FREE_PTR(cli->cl_seq); - cli->cl_seq = NULL; - } - - RETURN(0); -} - -int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data) +int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp, + struct lu_fid *fid, struct md_op_data *op_data) { - struct client_obd *cli = &exp->exp_obd->u.cli; - struct lu_client_seq *seq = cli->cl_seq; - ENTRY; - RETURN(seq_client_alloc_fid(NULL, seq, fid)); -} - -/* XXX This method is used only to clear current fid seq - * once fld/mds insert failed */ -static int mdc_fid_delete(struct obd_export *exp, const struct lu_fid *fid) -{ - struct client_obd *cli = &exp->exp_obd->u.cli; - - seq_client_flush(cli->cl_seq); - return 0; + struct client_obd *cli = &exp->exp_obd->u.cli; + struct lu_client_seq *seq = cli->cl_seq; + ENTRY; + RETURN(seq_client_alloc_fid(env, seq, fid)); } struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { @@ -2060,48 +3086,66 @@ struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { * recovery, non zero value will be return if the lock can be canceled, * or zero returned for not */ -static int mdc_cancel_for_recovery(struct ldlm_lock *lock) +static int mdc_cancel_weight(struct ldlm_lock *lock) { - if (lock->l_resource->lr_type != LDLM_IBITS) - RETURN(0); + if (lock->l_resource->lr_type != LDLM_IBITS) + RETURN(0); - /* FIXME: if we ever get into a situation where there are too many - * opened files with open locks on a single node, then we really - * should replay these open locks to reget it */ - if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN) - RETURN(0); + /* FIXME: if we ever get into a situation where there are too many + * opened files with open locks on a single node, then we really + * should replay these open locks to reget it */ + if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN) + RETURN(0); + + RETURN(1); +} + +static int mdc_resource_inode_free(struct ldlm_resource *res) +{ + if (res->lr_lvb_inode) + res->lr_lvb_inode = NULL; - RETURN(1); + return 0; } +struct ldlm_valblock_ops inode_lvbo = { + .lvbo_free = mdc_resource_inode_free +}; + static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { - struct client_obd *cli = &obd->u.cli; - struct lprocfs_static_vars lvars = { 0 }; - int rc; - ENTRY; + struct client_obd *cli = &obd->u.cli; + int rc; + ENTRY; OBD_ALLOC(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); if (!cli->cl_rpc_lock) RETURN(-ENOMEM); mdc_init_rpc_lock(cli->cl_rpc_lock); - ptlrpcd_addref(); + rc = ptlrpcd_addref(); + if (rc < 0) + GOTO(err_rpc_lock, rc); OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); if (!cli->cl_close_lock) - GOTO(err_rpc_lock, rc = -ENOMEM); + GOTO(err_ptlrpcd_decref, rc = -ENOMEM); mdc_init_rpc_lock(cli->cl_close_lock); rc = client_obd_setup(obd, cfg); if (rc) GOTO(err_close_lock, rc); - lprocfs_mdc_init_vars(&lvars); - lprocfs_obd_setup(obd, lvars.obd_vars); - sptlrpc_lprocfs_cliobd_attach(obd); - ptlrpc_lprocfs_register_obd(obd); +#ifdef LPROCFS + obd->obd_vars = lprocfs_mdc_obd_vars; + lprocfs_seq_obd_setup(obd); + lprocfs_alloc_md_stats(obd, 0); +#endif + sptlrpc_lprocfs_cliobd_attach(obd); + ptlrpc_lprocfs_register_obd(obd); + + ns_register_cancel(obd->obd_namespace, mdc_cancel_weight); - ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery); + obd->obd_namespace->ns_lvbo = &inode_lvbo; rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL); if (rc) { @@ -2113,33 +3157,41 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) err_close_lock: OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); +err_ptlrpcd_decref: + ptlrpcd_decref(); err_rpc_lock: OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); - ptlrpcd_decref(); RETURN(rc); } /* Initialize the default and maximum LOV EA and cookie sizes. This allows - * us to make MDS RPCs with large enough reply buffers to hold the - * maximum-sized (= maximum striped) EA and cookie without having to - * calculate this (via a call into the LOV + OSCs) each time we make an RPC. */ + * us to make MDS RPCs with large enough reply buffers to hold a default + * sized EA and cookie without having to calculate this (via a call into the + * LOV + OSCs) each time we make an RPC. The maximum size is also tracked + * but not used to avoid wastefully vmalloc()'ing large reply buffers when + * a large number of stripes is possible. If a larger reply buffer is + * required it will be reallocated in the ptlrpc layer due to overflow. + */ static int mdc_init_ea_size(struct obd_export *exp, int easize, - int def_easize, int cookiesize) + int def_easize, int cookiesize, int def_cookiesize) { - struct obd_device *obd = exp->exp_obd; - struct client_obd *cli = &obd->u.cli; - ENTRY; + struct obd_device *obd = exp->exp_obd; + struct client_obd *cli = &obd->u.cli; + ENTRY; - if (cli->cl_max_mds_easize < easize) - cli->cl_max_mds_easize = easize; + if (cli->cl_max_mds_easize < easize) + cli->cl_max_mds_easize = easize; - if (cli->cl_default_mds_easize < def_easize) - cli->cl_default_mds_easize = def_easize; + if (cli->cl_default_mds_easize < def_easize) + cli->cl_default_mds_easize = def_easize; - if (cli->cl_max_mds_cookiesize < cookiesize) - cli->cl_max_mds_cookiesize = cookiesize; + if (cli->cl_max_mds_cookiesize < cookiesize) + cli->cl_max_mds_cookiesize = cookiesize; - RETURN(0); + if (cli->cl_default_mds_cookiesize < def_cookiesize) + cli->cl_default_mds_cookiesize = def_cookiesize; + + RETURN(0); } static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) @@ -2151,13 +3203,14 @@ static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) case OBD_CLEANUP_EARLY: break; case OBD_CLEANUP_EXPORTS: - /* Failsafe, ok if racy */ - if (obd->obd_type->typ_refcnt <= 1) - libcfs_kkuc_group_rem(0, KUC_GRP_HSM); + /* Failsafe, ok if racy */ + if (obd->obd_type->typ_refcnt <= 1) + libcfs_kkuc_group_rem(0, KUC_GRP_HSM, NULL); obd_cleanup_client_import(obd); ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); + lprocfs_free_md_stats(obd); rc = obd_llog_finish(obd, 0); if (rc != 0) @@ -2183,65 +3236,44 @@ static int mdc_cleanup(struct obd_device *obd) static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, struct obd_device *tgt, int *index) { - struct llog_ctxt *ctxt; - int rc; - ENTRY; + struct llog_ctxt *ctxt; + int rc; - LASSERT(olg == &obd->obd_olg); + ENTRY; - rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL, - &llog_client_ops); - if (rc) - RETURN(rc); + LASSERT(olg == &obd->obd_olg); - ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT); - llog_initiator_connect(ctxt); - llog_ctxt_put(ctxt); + rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt, + &llog_client_ops); + if (rc) + RETURN(rc); - rc = llog_setup(obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt, 0, NULL, - &llog_client_ops); - if (rc == 0) { - ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT); - llog_initiator_connect(ctxt); - llog_ctxt_put(ctxt); - } + ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT); + llog_initiator_connect(ctxt); + llog_ctxt_put(ctxt); - RETURN(rc); + RETURN(0); } static int mdc_llog_finish(struct obd_device *obd, int count) { - struct llog_ctxt *ctxt; - int rc = 0; - ENTRY; + struct llog_ctxt *ctxt; - ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT); - if (ctxt) - rc = llog_cleanup(ctxt); + ENTRY; - ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); - if (ctxt) - rc = llog_cleanup(ctxt); + ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctxt) + llog_cleanup(NULL, ctxt); - RETURN(rc); + RETURN(0); } static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf) { struct lustre_cfg *lcfg = buf; - struct lprocfs_static_vars lvars = { 0 }; - int rc = 0; - - lprocfs_mdc_init_vars(&lvars); - switch (lcfg->lcfg_command) { - default: - rc = class_process_proc_param(PARAM_MDC, lvars.obd_vars, - lcfg, obd); - if (rc > 0) - rc = 0; - break; - } - return(rc); + int rc = class_process_proc_seq_param(PARAM_MDC, obd->obd_vars, + lcfg, obd); + return (rc > 0 ? 0: rc); } @@ -2339,27 +3371,6 @@ static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc, RETURN(0); } -static int mdc_connect(const struct lu_env *env, - struct obd_export **exp, - struct obd_device *obd, struct obd_uuid *cluuid, - struct obd_connect_data *data, - void *localdata) -{ - struct obd_import *imp = obd->u.cli.cl_import; - - /* mds-mds import features */ - if (data && (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) { - cfs_spin_lock(&imp->imp_lock); - imp->imp_server_timeout = 1; - cfs_spin_unlock(&imp->imp_lock); - imp->imp_client->cli_request_portal = MDS_MDS_PORTAL; - CDEBUG(D_OTHER, "%s: Set 'mds' portal and timeout\n", - obd->obd_name); - } - - return client_connect_import(env, exp, obd, cluuid, data, NULL); -} - struct obd_ops mdc_obd_ops = { .o_owner = THIS_MODULE, .o_setup = mdc_setup, @@ -2367,17 +3378,16 @@ struct obd_ops mdc_obd_ops = { .o_cleanup = mdc_cleanup, .o_add_conn = client_import_add_conn, .o_del_conn = client_import_del_conn, - .o_connect = mdc_connect, + .o_connect = client_connect_import, .o_disconnect = client_disconnect_export, .o_iocontrol = mdc_iocontrol, .o_set_info_async = mdc_set_info_async, .o_statfs = mdc_statfs, .o_pin = mdc_pin, .o_unpin = mdc_unpin, - .o_fid_init = mdc_fid_init, - .o_fid_fini = mdc_fid_fini, + .o_fid_init = client_fid_init, + .o_fid_fini = client_fid_fini, .o_fid_alloc = mdc_fid_alloc, - .o_fid_delete = mdc_fid_delete, .o_import_event = mdc_import_event, .o_llog_init = mdc_llog_init, .o_llog_finish = mdc_llog_finish, @@ -2390,7 +3400,7 @@ struct obd_ops mdc_obd_ops = { struct md_ops mdc_md_ops = { .m_getstatus = mdc_getstatus, - .m_change_cbdata = mdc_change_cbdata, + .m_null_inode = mdc_null_inode, .m_find_cbdata = mdc_find_cbdata, .m_close = mdc_close, .m_create = mdc_create, @@ -2405,8 +3415,8 @@ struct md_ops mdc_md_ops = { .m_setattr = mdc_setattr, .m_setxattr = mdc_setxattr, .m_getxattr = mdc_getxattr, - .m_sync = mdc_sync, - .m_readpage = mdc_readpage, + .m_fsync = mdc_fsync, + .m_read_entry = mdc_read_entry, .m_unlink = mdc_unlink, .m_cancel_unused = mdc_cancel_unused, .m_init_ea_size = mdc_init_ea_size, @@ -2425,13 +3435,11 @@ struct md_ops mdc_md_ops = { int __init mdc_init(void) { - int rc; - struct lprocfs_static_vars lvars = { 0 }; - lprocfs_mdc_init_vars(&lvars); - - rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars, - LUSTRE_MDC_NAME, NULL); - RETURN(rc); + return class_register_type(&mdc_obd_ops, &mdc_md_ops, true, NULL, +#ifndef HAVE_ONLY_PROCFS_SEQ + NULL, +#endif + LUSTRE_MDC_NAME, NULL); } #ifdef __KERNEL__