X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdc%2Fmdc_request.c;h=50f9600407781e331cd3c38da4bcd47c3c2276d6;hp=e1ebd0aefa57d2eff5865ed9629722d09446759c;hb=848f9e20320cb7c01eaf7f1b5c27f5efd54e4818;hpb=95dbd166988cf0a2912c51305572c78a9aa2d324 diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index e1ebd0a..50f9600 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -1,6 +1,4 @@ -/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- - * vim:expandtab:shiftwidth=8:tabstop=8: - * +/* * GPL HEADER START * * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. @@ -26,17 +24,16 @@ * GPL HEADER END */ /* - * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_MDC #ifdef __KERNEL__ @@ -44,24 +41,26 @@ # include # include # include +# include #else # include #endif #include #include -#include #include -#include #include #include +#include + #include "mdc_internal.h" -#include #define REQUEST_MINOR 244 -static quota_interface_t *quota_interface; -extern quota_interface_t mdc_quota_interface; +struct mdc_renew_capa_args { + struct obd_capa *ra_oc; + renew_capa_cb_t ra_cb; +}; static int mdc_cleanup(struct obd_device *obd); @@ -88,6 +87,24 @@ int mdc_unpack_capa(struct obd_export *exp, struct ptlrpc_request *req, } } +static inline int mdc_queue_wait(struct ptlrpc_request *req) +{ + struct client_obd *cli = &req->rq_import->imp_obd->u.cli; + int rc; + + /* mdc_enter_request() ensures that this client has no more + * than cl_max_rpcs_in_flight RPCs simultaneously inf light + * against an MDT. */ + rc = mdc_enter_request(cli); + if (rc != 0) + return rc; + + rc = ptlrpc_queue_wait(req); + mdc_exit_request(cli); + + return rc; +} + /* Helper that implements most of mdc_getstatus and signal_completed_replay. */ /* XXX this should become mdc_get_info("key"), sending MDS_GET_INFO RPC */ static int send_getstatus(struct obd_import *imp, struct lu_fid *rootfid, @@ -202,20 +219,24 @@ static int mdc_getattr_common(struct obd_export *exp, RETURN(0); } -int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, obd_valid valid, int ea_size, +int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { struct ptlrpc_request *req; int rc; ENTRY; + /* Single MDS without an LMV case */ + if (op_data->op_flags & MF_GET_MDT_IDX) { + op_data->op_mds = 0; + RETURN(0); + } *request = NULL; req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR); if (req == NULL) RETURN(-ENOMEM); - mdc_set_capa_size(req, &RMF_CAPA1, oc); + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR); if (rc) { @@ -223,11 +244,12 @@ int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } - /* MDS_BFLAG_EXT_FLAGS: request "new" flags(bug 9486) */ - mdc_pack_body(req, fid, oc, valid, ea_size, -1, MDS_BFLAG_EXT_FLAGS); + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + op_data->op_valid, op_data->op_mode, -1, 0); - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, ea_size); - if (valid & OBD_MD_FLRMTPERM) { + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + op_data->op_mode); + if (op_data->op_valid & OBD_MD_FLRMTPERM) { LASSERT(client_is_remote(exp)); req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, sizeof(struct mdt_remote_perm)); @@ -242,9 +264,7 @@ int mdc_getattr(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } -int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, const char *filename, int namelen, - obd_valid valid, int ea_size, __u32 suppgid, +int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data, struct ptlrpc_request **request) { struct ptlrpc_request *req; @@ -257,8 +277,9 @@ int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid, if (req == NULL) RETURN(-ENOMEM); - mdc_set_capa_size(req, &RMF_CAPA1, oc); - req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, namelen); + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, + op_data->op_namelen + 1); rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_GETATTR_NAME); if (rc) { @@ -266,17 +287,19 @@ int mdc_getattr_name(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } - /* MDS_BFLAG_EXT_FLAGS: request "new" flags(bug 9486) */ - mdc_pack_body(req, fid, oc, valid, ea_size, suppgid, - MDS_BFLAG_EXT_FLAGS); + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + op_data->op_valid, op_data->op_mode, + op_data->op_suppgids[0], 0); - if (filename) { + if (op_data->op_name) { char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME); - LASSERT(strnlen(filename, namelen) == namelen - 1); - memcpy(name, filename, namelen); + LASSERT(strnlen(op_data->op_name, op_data->op_namelen) == + op_data->op_namelen); + memcpy(name, op_data->op_name, op_data->op_namelen); } - req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, ea_size); + req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, + op_data->op_mode); ptlrpc_request_set_replen(req); rc = mdc_getattr_common(exp, req); @@ -448,7 +471,7 @@ static int mdc_unpack_acl(struct ptlrpc_request *req, struct lustre_md *md) if (!buf) RETURN(-EPROTO); - acl = posix_acl_from_xattr(buf, body->aclsize); + acl = posix_acl_from_xattr(&init_user_ns, buf, body->aclsize); if (IS_ERR(acl)) { rc = PTR_ERR(acl); CERROR("convert xattr to acl: %d\n", rc); @@ -692,11 +715,11 @@ void mdc_commit_open(struct ptlrpc_request *req) * be put along with freeing \var mod. */ ptlrpc_request_addref(req); - spin_lock(&req->rq_lock); - req->rq_committed = 1; - spin_unlock(&req->rq_lock); - req->rq_cb_data = NULL; - obd_mod_put(mod); + spin_lock(&req->rq_lock); + req->rq_committed = 1; + spin_unlock(&req->rq_lock); + req->rq_cb_data = NULL; + obd_mod_put(mod); } int mdc_set_open_replay_data(struct obd_export *exp, @@ -737,13 +760,13 @@ int mdc_set_open_replay_data(struct obd_export *exp, obd_mod_get(mod); obd_mod_get(mod); - spin_lock(&open_req->rq_lock); - och->och_mod = mod; - mod->mod_och = och; - mod->mod_open_req = open_req; - open_req->rq_cb_data = mod; - open_req->rq_commit_cb = mdc_commit_open; - spin_unlock(&open_req->rq_lock); + spin_lock(&open_req->rq_lock); + och->och_mod = mod; + mod->mod_och = och; + mod->mod_open_req = open_req; + open_req->rq_cb_data = mod; + open_req->rq_commit_cb = mdc_commit_open; + spin_unlock(&open_req->rq_lock); } rec->cr_fid2 = body->fid1; @@ -766,7 +789,14 @@ int mdc_clear_open_replay_data(struct obd_export *exp, struct md_open_data *mod = och->och_mod; ENTRY; - LASSERT(mod != LP_POISON && mod != NULL); + /** + * It is possible to not have \var mod in a case of eviction between + * lookup and ll_file_open(). + **/ + if (mod == NULL) + RETURN(0); + + LASSERT(mod != LP_POISON); mod->mod_och = NULL; och->och_mod = NULL; @@ -829,9 +859,9 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, DEBUG_REQ(D_HA, mod->mod_open_req, "matched open"); /* We no longer want to preserve this open for replay even * though the open was committed. b=3632, b=3633 */ - spin_lock(&mod->mod_open_req->rq_lock); - mod->mod_open_req->rq_replay = 0; - spin_unlock(&mod->mod_open_req->rq_lock); + spin_lock(&mod->mod_open_req->rq_lock); + mod->mod_open_req->rq_replay = 0; + spin_unlock(&mod->mod_open_req->rq_lock); } else { CDEBUG(D_HA, "couldn't find open req; expecting close error\n"); } @@ -874,6 +904,7 @@ int mdc_close(struct obd_export *exp, struct md_op_data *op_data, * exists and return no error in that case */ if (mod) { + DEBUG_REQ(D_HA, req, "Reset ESTALE = %d", rc); LASSERT(mod->mod_open_req != NULL); if (mod->mod_open_req->rq_committed) rc = 0; @@ -921,9 +952,9 @@ int mdc_done_writing(struct obd_export *exp, struct md_op_data *op_data, DEBUG_REQ(D_HA, mod->mod_open_req, "matched setattr"); /* We no longer want to preserve this setattr for replay even * though the open was committed. b=3632, b=3633 */ - spin_lock(&mod->mod_open_req->rq_lock); - mod->mod_open_req->rq_replay = 0; - spin_unlock(&mod->mod_open_req->rq_lock); + spin_lock(&mod->mod_open_req->rq_lock); + mod->mod_open_req->rq_replay = 0; + spin_unlock(&mod->mod_open_req->rq_lock); } mdc_close_pack(req, op_data); @@ -984,9 +1015,9 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid, req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); - desc = ptlrpc_prep_bulk_imp(req, 1, BULK_GET_SOURCE, MDS_BULK_PORTAL); - if (desc == NULL) - GOTO(out, rc = -ENOMEM); + desc = ptlrpc_prep_bulk_imp(req, 1, 1,BULK_GET_SOURCE, MDS_BULK_PORTAL); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); /* NB req now owns desc and will free it when it gets freed. */ ptlrpc_prep_bulk_page(desc, (struct page *)page, 0, offset); @@ -1005,21 +1036,27 @@ out: EXPORT_SYMBOL(mdc_sendpage); #endif -int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid, - struct obd_capa *oc, __u64 offset, struct page *page, - struct ptlrpc_request **request) +int mdc_readpage(struct obd_export *exp, struct md_op_data *op_data, + struct page **pages, struct ptlrpc_request **request) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; + int i; + cfs_waitq_t waitq; + int resends = 0; + struct l_wait_info lwi; int rc; ENTRY; *request = NULL; + cfs_waitq_init(&waitq); + +restart_bulk: req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_READPAGE); if (req == NULL) RETURN(-ENOMEM); - mdc_set_capa_size(req, &RMF_CAPA1, oc); + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_READPAGE); if (rc) { @@ -1030,21 +1067,37 @@ int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid, req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); - desc = ptlrpc_prep_bulk_imp(req, 1, BULK_PUT_SINK, MDS_BULK_PORTAL); + desc = ptlrpc_prep_bulk_imp(req, op_data->op_npages, 1, BULK_PUT_SINK, + MDS_BULK_PORTAL); if (desc == NULL) { ptlrpc_request_free(req); RETURN(-ENOMEM); } /* NB req now owns desc and will free it when it gets freed */ - ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE); - mdc_readdir_pack(req, offset, CFS_PAGE_SIZE, fid, oc); + for (i = 0; i < op_data->op_npages; i++) + ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE); + + mdc_readdir_pack(req, op_data->op_offset, + PAGE_CACHE_SIZE * op_data->op_npages, + &op_data->op_fid1, op_data->op_capa1); ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); if (rc) { ptlrpc_req_finished(req); - RETURN(rc); + if (rc != -ETIMEDOUT) + RETURN(rc); + + resends++; + if (!client_should_resend(resends, &exp->exp_obd->u.cli)) { + CERROR("too many resend retries, returning error\n"); + RETURN(-EIO); + } + lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL); + l_wait_event(waitq, 0, &lwi); + + goto restart_bulk; } rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, @@ -1054,9 +1107,10 @@ int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid, RETURN(rc); } - if (req->rq_bulk->bd_nob_transferred != CFS_PAGE_SIZE) { + if (req->rq_bulk->bd_nob_transferred & ~LU_PAGE_MASK) { CERROR("Unexpected # bytes transferred: %d (%ld expected)\n", - req->rq_bulk->bd_nob_transferred, CFS_PAGE_SIZE); + req->rq_bulk->bd_nob_transferred, + PAGE_CACHE_SIZE * op_data->op_npages); ptlrpc_req_finished(req); RETURN(-EPROTO); } @@ -1065,6 +1119,62 @@ int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid, RETURN(0); } +static int mdc_statfs(const struct lu_env *env, + struct obd_export *exp, struct obd_statfs *osfs, + __u64 max_age, __u32 flags) +{ + struct obd_device *obd = class_exp2obd(exp); + struct ptlrpc_request *req; + struct obd_statfs *msfs; + struct obd_import *imp = NULL; + int rc; + ENTRY; + + /* + * Since the request might also come from lprocfs, so we need + * sync this with client_disconnect_export Bug15684 + */ + down_read(&obd->u.cli.cl_sem); + if (obd->u.cli.cl_import) + imp = class_import_get(obd->u.cli.cl_import); + up_read(&obd->u.cli.cl_sem); + if (!imp) + RETURN(-ENODEV); + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS, + LUSTRE_MDS_VERSION, MDS_STATFS); + if (req == NULL) + GOTO(output, rc = -ENOMEM); + + ptlrpc_request_set_replen(req); + + if (flags & OBD_STATFS_NODELAY) { + /* procfs requests not want stay in wait for avoid deadlock */ + req->rq_no_resend = 1; + req->rq_no_delay = 1; + } + + rc = ptlrpc_queue_wait(req); + if (rc) { + /* check connection error first */ + if (imp->imp_connect_error) + rc = imp->imp_connect_error; + GOTO(out, rc); + } + + msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); + if (msfs == NULL) + GOTO(out, rc = -EPROTO); + + *osfs = *msfs; + EXIT; +out: + ptlrpc_req_finished(req); +output: + class_import_put(imp); + return rc; +} + static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf) { __u32 keylen, vallen; @@ -1077,12 +1187,12 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf) RETURN(-EOVERFLOW); /* Key is KEY_FID2PATH + getinfo_fid2path description */ - keylen = size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf); + keylen = cfs_size_round(sizeof(KEY_FID2PATH)) + sizeof(*gf); OBD_ALLOC(key, keylen); if (key == NULL) RETURN(-ENOMEM); memcpy(key, KEY_FID2PATH, sizeof(KEY_FID2PATH)); - memcpy(key + size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf)); + memcpy(key + cfs_size_round(sizeof(KEY_FID2PATH)), gf, sizeof(*gf)); CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n", PFID(&gf->gf_fid), gf->gf_recno, gf->gf_linkno); @@ -1093,9 +1203,9 @@ static int mdc_ioc_fid2path(struct obd_export *exp, struct getinfo_fid2path *gf) /* Val is struct getinfo_fid2path result plus path */ vallen = sizeof(*gf) + gf->gf_pathlen; - rc = obd_get_info(exp, keylen, key, &vallen, gf, NULL); - if (rc) - GOTO(out, rc); + rc = obd_get_info(NULL, exp, keylen, key, &vallen, gf, NULL); + if (rc != 0 && rc != -EREMOTE) + GOTO(out, rc); if (vallen <= sizeof(*gf)) GOTO(out, rc = -EPROTO); @@ -1110,6 +1220,578 @@ out: return rc; } +static int mdc_ioc_hsm_progress(struct obd_export *exp, + struct hsm_progress_kernel *hpk) +{ + struct obd_import *imp = class_exp2cliimp(exp); + struct hsm_progress_kernel *req_hpk; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_PROGRESS, + LUSTRE_MDS_VERSION, MDS_HSM_PROGRESS); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0); + + /* Copy hsm_progress struct */ + req_hpk = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_PROGRESS); + if (req_hpk == NULL) + GOTO(out, rc = -EPROTO); + + *req_hpk = *hpk; + req_hpk->hpk_errval = lustre_errno_hton(hpk->hpk_errval); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_ct_register(struct obd_import *imp, __u32 archives) +{ + __u32 *archive_mask; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_REGISTER, + LUSTRE_MDS_VERSION, + MDS_HSM_CT_REGISTER); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0); + + /* Copy hsm_progress struct */ + archive_mask = req_capsule_client_get(&req->rq_pill, + &RMF_MDS_HSM_ARCHIVE); + if (archive_mask == NULL) + GOTO(out, rc = -EPROTO); + + *archive_mask = archives; + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_current_action(struct obd_export *exp, + struct md_op_data *op_data) +{ + struct hsm_current_action *hca = op_data->op_data; + struct hsm_current_action *req_hca; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_HSM_ACTION); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_ACTION); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + if (rc) + GOTO(out, rc); + + req_hca = req_capsule_server_get(&req->rq_pill, + &RMF_MDS_HSM_CURRENT_ACTION); + if (req_hca == NULL) + GOTO(out, rc = -EPROTO); + + *hca = *req_hca; + + EXIT; +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_ct_unregister(struct obd_import *imp) +{ + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_HSM_CT_UNREGISTER, + LUSTRE_MDS_VERSION, + MDS_HSM_CT_UNREGISTER); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_state_get(struct obd_export *exp, + struct md_op_data *op_data) +{ + struct hsm_user_state *hus = op_data->op_data; + struct hsm_user_state *req_hus; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_HSM_STATE_GET); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_GET); + if (rc != 0) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + if (rc) + GOTO(out, rc); + + req_hus = req_capsule_server_get(&req->rq_pill, &RMF_HSM_USER_STATE); + if (req_hus == NULL) + GOTO(out, rc = -EPROTO); + + *hus = *req_hus; + + EXIT; +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_state_set(struct obd_export *exp, + struct md_op_data *op_data) +{ + struct hsm_state_set *hss = op_data->op_data; + struct hsm_state_set *req_hss; + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_HSM_STATE_SET); + if (req == NULL) + RETURN(-ENOMEM); + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_STATE_SET); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, &op_data->op_fid1, op_data->op_capa1, + OBD_MD_FLRMTPERM, 0, op_data->op_suppgids[0], 0); + + /* Copy states */ + req_hss = req_capsule_client_get(&req->rq_pill, &RMF_HSM_STATE_SET); + if (req_hss == NULL) + GOTO(out, rc = -EPROTO); + *req_hss = *hss; + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); + + EXIT; +out: + ptlrpc_req_finished(req); + return rc; +} + +static int mdc_ioc_hsm_request(struct obd_export *exp, + struct hsm_user_request *hur) +{ + struct obd_import *imp = class_exp2cliimp(exp); + struct ptlrpc_request *req; + struct hsm_request *req_hr; + struct hsm_user_item *req_hui; + char *req_opaque; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(imp, &RQF_MDS_HSM_REQUEST); + if (req == NULL) + GOTO(out, rc = -ENOMEM); + + req_capsule_set_size(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM, RCL_CLIENT, + hur->hur_request.hr_itemcount + * sizeof(struct hsm_user_item)); + req_capsule_set_size(&req->rq_pill, &RMF_GENERIC_DATA, RCL_CLIENT, + hur->hur_request.hr_data_len); + + rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_HSM_REQUEST); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_pack_body(req, NULL, NULL, OBD_MD_FLRMTPERM, 0, 0, 0); + + /* Copy hsm_request struct */ + req_hr = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_REQUEST); + if (req_hr == NULL) + GOTO(out, rc = -EPROTO); + *req_hr = hur->hur_request; + + /* Copy hsm_user_item structs */ + req_hui = req_capsule_client_get(&req->rq_pill, &RMF_MDS_HSM_USER_ITEM); + if (req_hui == NULL) + GOTO(out, rc = -EPROTO); + memcpy(req_hui, hur->hur_user_item, + hur->hur_request.hr_itemcount * sizeof(struct hsm_user_item)); + + /* Copy opaque field */ + req_opaque = req_capsule_client_get(&req->rq_pill, &RMF_GENERIC_DATA); + if (req_opaque == NULL) + GOTO(out, rc = -EPROTO); + memcpy(req_opaque, hur_data(hur), hur->hur_request.hr_data_len); + + ptlrpc_request_set_replen(req); + + rc = mdc_queue_wait(req); + GOTO(out, rc); + +out: + ptlrpc_req_finished(req); + return rc; +} + +static struct kuc_hdr *changelog_kuc_hdr(char *buf, int len, int flags) +{ + struct kuc_hdr *lh = (struct kuc_hdr *)buf; + + LASSERT(len <= CR_MAXSIZE); + + lh->kuc_magic = KUC_MAGIC; + lh->kuc_transport = KUC_TRANSPORT_CHANGELOG; + lh->kuc_flags = flags; + lh->kuc_msgtype = CL_RECORD; + lh->kuc_msglen = len; + return lh; +} + +#define D_CHANGELOG 0 + +struct changelog_show { + __u64 cs_startrec; + __u32 cs_flags; + struct file *cs_fp; + char *cs_buf; + struct obd_device *cs_obd; +}; + +static int changelog_kkuc_cb(const struct lu_env *env, struct llog_handle *llh, + struct llog_rec_hdr *hdr, void *data) +{ + struct changelog_show *cs = data; + struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr; + struct kuc_hdr *lh; + int len, rc; + ENTRY; + + if (rec->cr_hdr.lrh_type != CHANGELOG_REC) { + rc = -EINVAL; + CERROR("%s: not a changelog rec %x/%d: rc = %d\n", + cs->cs_obd->obd_name, rec->cr_hdr.lrh_type, + rec->cr.cr_type, rc); + RETURN(rc); + } + + if (rec->cr.cr_index < cs->cs_startrec) { + /* Skip entries earlier than what we are interested in */ + CDEBUG(D_CHANGELOG, "rec="LPU64" start="LPU64"\n", + rec->cr.cr_index, cs->cs_startrec); + RETURN(0); + } + + CDEBUG(D_CHANGELOG, LPU64" %02d%-5s "LPU64" 0x%x t="DFID" p="DFID + " %.*s\n", rec->cr.cr_index, rec->cr.cr_type, + changelog_type2str(rec->cr.cr_type), rec->cr.cr_time, + rec->cr.cr_flags & CLF_FLAGMASK, + PFID(&rec->cr.cr_tfid), PFID(&rec->cr.cr_pfid), + rec->cr.cr_namelen, changelog_rec_name(&rec->cr)); + + len = sizeof(*lh) + changelog_rec_size(&rec->cr) + rec->cr.cr_namelen; + + /* Set up the message */ + lh = changelog_kuc_hdr(cs->cs_buf, len, cs->cs_flags); + memcpy(lh + 1, &rec->cr, len - sizeof(*lh)); + + rc = libcfs_kkuc_msg_put(cs->cs_fp, lh); + CDEBUG(D_CHANGELOG, "kucmsg fp %p len %d rc %d\n", cs->cs_fp, len,rc); + + RETURN(rc); +} + +static int mdc_changelog_send_thread(void *csdata) +{ + struct changelog_show *cs = csdata; + struct llog_ctxt *ctxt = NULL; + struct llog_handle *llh = NULL; + struct kuc_hdr *kuch; + int rc; + + CDEBUG(D_CHANGELOG, "changelog to fp=%p start "LPU64"\n", + cs->cs_fp, cs->cs_startrec); + + OBD_ALLOC(cs->cs_buf, CR_MAXSIZE); + if (cs->cs_buf == NULL) + GOTO(out, rc = -ENOMEM); + + /* Set up the remote catalog handle */ + ctxt = llog_get_context(cs->cs_obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctxt == NULL) + GOTO(out, rc = -ENOENT); + rc = llog_open(NULL, ctxt, &llh, NULL, CHANGELOG_CATALOG, + LLOG_OPEN_EXISTS); + if (rc) { + CERROR("%s: fail to open changelog catalog: rc = %d\n", + cs->cs_obd->obd_name, rc); + GOTO(out, rc); + } + rc = llog_init_handle(NULL, llh, LLOG_F_IS_CAT, NULL); + if (rc) { + CERROR("llog_init_handle failed %d\n", rc); + GOTO(out, rc); + } + + rc = llog_cat_process(NULL, llh, changelog_kkuc_cb, cs, 0, 0); + + /* Send EOF no matter what our result */ + if ((kuch = changelog_kuc_hdr(cs->cs_buf, sizeof(*kuch), + cs->cs_flags))) { + kuch->kuc_msgtype = CL_EOF; + libcfs_kkuc_msg_put(cs->cs_fp, kuch); + } + +out: + fput(cs->cs_fp); + if (llh) + llog_cat_close(NULL, llh); + if (ctxt) + llog_ctxt_put(ctxt); + if (cs->cs_buf) + OBD_FREE(cs->cs_buf, CR_MAXSIZE); + OBD_FREE_PTR(cs); + return rc; +} + +static int mdc_ioc_changelog_send(struct obd_device *obd, + struct ioc_changelog *icc) +{ + struct changelog_show *cs; + int rc; + + /* Freed in mdc_changelog_send_thread */ + OBD_ALLOC_PTR(cs); + if (!cs) + return -ENOMEM; + + cs->cs_obd = obd; + cs->cs_startrec = icc->icc_recno; + /* matching fput in mdc_changelog_send_thread */ + cs->cs_fp = fget(icc->icc_id); + cs->cs_flags = icc->icc_flags; + + /* + * New thread because we should return to user app before + * writing into our pipe + */ + rc = PTR_ERR(kthread_run(mdc_changelog_send_thread, cs, + "mdc_clg_send_thread")); + if (!IS_ERR_VALUE(rc)) { + CDEBUG(D_CHANGELOG, "start changelog thread\n"); + return 0; + } + + CERROR("Failed to start changelog thread: %d\n", rc); + OBD_FREE_PTR(cs); + return rc; +} + +static int mdc_ioc_hsm_ct_start(struct obd_export *exp, + struct lustre_kernelcomm *lk); + +static int mdc_quotacheck(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + struct ptlrpc_request *req; + struct obd_quotactl *body; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), + &RQF_MDS_QUOTACHECK, LUSTRE_MDS_VERSION, + MDS_QUOTACHECK); + if (req == NULL) + RETURN(-ENOMEM); + + body = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); + *body = *oqctl; + + ptlrpc_request_set_replen(req); + + /* the next poll will find -ENODATA, that means quotacheck is + * going on */ + cli->cl_qchk_stat = -ENODATA; + rc = ptlrpc_queue_wait(req); + if (rc) + cli->cl_qchk_stat = rc; + ptlrpc_req_finished(req); + RETURN(rc); +} + +static int mdc_quota_poll_check(struct obd_export *exp, + struct if_quotacheck *qchk) +{ + struct client_obd *cli = &exp->exp_obd->u.cli; + int rc; + ENTRY; + + qchk->obd_uuid = cli->cl_target_uuid; + memcpy(qchk->obd_type, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)); + + rc = cli->cl_qchk_stat; + /* the client is not the previous one */ + if (rc == CL_NOT_QUOTACHECKED) + rc = -EINTR; + RETURN(rc); +} + +static int mdc_quotactl(struct obd_device *unused, struct obd_export *exp, + struct obd_quotactl *oqctl) +{ + struct ptlrpc_request *req; + struct obd_quotactl *oqc; + int rc; + ENTRY; + + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), + &RQF_MDS_QUOTACTL, LUSTRE_MDS_VERSION, + MDS_QUOTACTL); + if (req == NULL) + RETURN(-ENOMEM); + + oqc = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL); + *oqc = *oqctl; + + ptlrpc_request_set_replen(req); + ptlrpc_at_set_req_timeout(req); + req->rq_no_resend = 1; + + rc = ptlrpc_queue_wait(req); + if (rc) + CERROR("ptlrpc_queue_wait failed, rc: %d\n", rc); + + if (req->rq_repmsg && + (oqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL))) { + *oqctl = *oqc; + } else if (!rc) { + CERROR ("Can't unpack obd_quotactl\n"); + rc = -EPROTO; + } + ptlrpc_req_finished(req); + + RETURN(rc); +} + +static int mdc_ioc_swap_layouts(struct obd_export *exp, + struct md_op_data *op_data) +{ + CFS_LIST_HEAD(cancels); + struct ptlrpc_request *req; + int rc, count; + struct mdc_swap_layouts *msl, *payload; + ENTRY; + + msl = op_data->op_data; + + /* When the MDT will get the MDS_SWAP_LAYOUTS RPC the + * first thing it will do is to cancel the 2 layout + * locks hold by this client. + * So the client must cancel its layout locks on the 2 fids + * with the request RPC to avoid extra RPC round trips + */ + count = mdc_resource_get_unused(exp, &op_data->op_fid1, &cancels, + LCK_CR, MDS_INODELOCK_LAYOUT); + count += mdc_resource_get_unused(exp, &op_data->op_fid2, &cancels, + LCK_CR, MDS_INODELOCK_LAYOUT); + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), + &RQF_MDS_SWAP_LAYOUTS); + if (req == NULL) { + ldlm_lock_list_put(&cancels, l_bl_ast, count); + RETURN(-ENOMEM); + } + + mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1); + mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2); + + rc = mdc_prep_elc_req(exp, req, MDS_SWAP_LAYOUTS, &cancels, count); + if (rc) { + ptlrpc_request_free(req); + RETURN(rc); + } + + mdc_swap_layouts_pack(req, op_data); + + payload = req_capsule_client_get(&req->rq_pill, &RMF_SWAP_LAYOUTS); + LASSERT(payload); + + *payload = *msl; + + ptlrpc_request_set_replen(req); + + rc = ptlrpc_queue_wait(req); + if (rc) + GOTO(out, rc); + EXIT; + +out: + ptlrpc_req_finished(req); + return rc; +} + static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, void *karg, void *uarg) { @@ -1120,26 +1802,48 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, int rc; ENTRY; - if (!try_module_get(THIS_MODULE)) { + if (!cfs_try_module_get(THIS_MODULE)) { CERROR("Can't get module. Is it alive?"); return -EINVAL; } switch (cmd) { + case OBD_IOC_CHANGELOG_SEND: + rc = mdc_ioc_changelog_send(obd, karg); + GOTO(out, rc); case OBD_IOC_CHANGELOG_CLEAR: { - struct ioc_changelog_clear *icc = karg; + struct ioc_changelog *icc = karg; struct changelog_setinfo cs = - {icc->icc_recno, icc->icc_id}; - rc = obd_set_info_async(exp, strlen(KEY_CHANGELOG_CLEAR), + {.cs_recno = icc->icc_recno, .cs_id = icc->icc_id}; + rc = obd_set_info_async(NULL, exp, strlen(KEY_CHANGELOG_CLEAR), KEY_CHANGELOG_CLEAR, sizeof(cs), &cs, NULL); GOTO(out, rc); } - case OBD_IOC_FID2PATH: { - rc = mdc_ioc_fid2path(exp, karg); - GOTO(out, rc); - } + case OBD_IOC_FID2PATH: + rc = mdc_ioc_fid2path(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_CT_START: + rc = mdc_ioc_hsm_ct_start(exp, karg); + /* ignore if it was already registered on this MDS. */ + if (rc == -EEXIST) + rc = 0; + GOTO(out, rc); + case LL_IOC_HSM_PROGRESS: + rc = mdc_ioc_hsm_progress(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_STATE_GET: + rc = mdc_ioc_hsm_state_get(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_STATE_SET: + rc = mdc_ioc_hsm_state_set(exp, karg); + case LL_IOC_HSM_ACTION: + rc = mdc_ioc_hsm_current_action(exp, karg); + GOTO(out, rc); + case LL_IOC_HSM_REQUEST: + rc = mdc_ioc_hsm_request(exp, karg); + GOTO(out, rc); case OBD_IOC_CLIENT_RECOVER: - rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1); + rc = ptlrpc_recover_import(imp, data->ioc_inlbuf1, 0); if (rc < 0) GOTO(out, rc); GOTO(out, rc = 0); @@ -1148,7 +1852,8 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, GOTO(out, rc); case OBD_IOC_PARSE: { ctxt = llog_get_context(exp->exp_obd, LLOG_CONFIG_REPL_CTXT); - rc = class_config_parse_llog(ctxt, data->ioc_inlbuf1, NULL); + rc = class_config_parse_llog(NULL, ctxt, data->ioc_inlbuf1, + NULL); llog_ctxt_put(ctxt); GOTO(out, rc); } @@ -1156,24 +1861,85 @@ static int mdc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, case OBD_IOC_LLOG_INFO: case OBD_IOC_LLOG_PRINT: { ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT); - rc = llog_ioctl(ctxt, cmd, data); + rc = llog_ioctl(NULL, ctxt, cmd, data); llog_ctxt_put(ctxt); GOTO(out, rc); } #endif case OBD_IOC_POLL_QUOTACHECK: - rc = lquota_poll_check(quota_interface, exp, - (struct if_quotacheck *)karg); + rc = mdc_quota_poll_check(exp, (struct if_quotacheck *)karg); GOTO(out, rc); case OBD_IOC_PING_TARGET: rc = ptlrpc_obd_ping(obd); GOTO(out, rc); + /* + * Normally IOC_OBD_STATFS, OBD_IOC_QUOTACTL iocontrol are handled by + * LMV instead of MDC. But when the cluster is upgraded from 1.8, + * there'd be no LMV layer thus we might be called here. Eventually + * this code should be removed. + * bz20731, LU-592. + */ + case IOC_OBD_STATFS: { + struct obd_statfs stat_buf = {0}; + + if (*((__u32 *) data->ioc_inlbuf2) != 0) + GOTO(out, rc = -ENODEV); + + /* copy UUID */ + if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(obd), + min((int)data->ioc_plen2, + (int)sizeof(struct obd_uuid)))) + GOTO(out, rc = -EFAULT); + + rc = mdc_statfs(NULL, obd->obd_self_export, &stat_buf, + cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), + 0); + if (rc != 0) + GOTO(out, rc); + + if (copy_to_user(data->ioc_pbuf1, &stat_buf, + min((int) data->ioc_plen1, + (int) sizeof(stat_buf)))) + GOTO(out, rc = -EFAULT); + + GOTO(out, rc = 0); + } + case OBD_IOC_QUOTACTL: { + struct if_quotactl *qctl = karg; + struct obd_quotactl *oqctl; + + OBD_ALLOC_PTR(oqctl); + if (!oqctl) + RETURN(-ENOMEM); + + QCTL_COPY(oqctl, qctl); + rc = obd_quotactl(exp, oqctl); + if (rc == 0) { + QCTL_COPY(qctl, oqctl); + qctl->qc_valid = QC_MDTIDX; + qctl->obd_uuid = obd->u.cli.cl_target_uuid; + } + OBD_FREE_PTR(oqctl); + break; + } + case LL_IOC_GET_CONNECT_FLAGS: { + if (copy_to_user(uarg, + exp_connect_flags_ptr(exp), + sizeof(__u64))) + GOTO(out, rc = -EFAULT); + else + GOTO(out, rc = 0); + } + case LL_IOC_LOV_SWAP_LAYOUTS: { + rc = mdc_ioc_swap_layouts(exp, karg); + break; + } default: CERROR("mdc_ioctl(): unrecognised ioctl %#x\n", cmd); GOTO(out, rc = -ENOTTY); } out: - module_put(THIS_MODULE); + cfs_module_put(THIS_MODULE); return rc; } @@ -1212,130 +1978,178 @@ int mdc_get_info_rpc(struct obd_export *exp, RCL_SERVER, vallen); ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - if (rc == 0) { - tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL); - memcpy(val, tmp, vallen); - if (ptlrpc_rep_need_swab(req)) { - if (KEY_IS(KEY_FID2PATH)) { - lustre_swab_fid2path(val); - } - } - } - ptlrpc_req_finished(req); - - RETURN(rc); + rc = ptlrpc_queue_wait(req); + /* -EREMOTE means the get_info result is partial, and it needs to + * continue on another MDT, see fid2path part in lmv_iocontrol */ + if (rc == 0 || rc == -EREMOTE) { + tmp = req_capsule_server_get(&req->rq_pill, &RMF_GETINFO_VAL); + memcpy(val, tmp, vallen); + if (ptlrpc_rep_need_swab(req)) { + if (KEY_IS(KEY_FID2PATH)) + lustre_swab_fid2path(val); + } + } + ptlrpc_req_finished(req); + + RETURN(rc); } static void lustre_swab_hai(struct hsm_action_item *h) { - __swab32s(&h->hai_len); - __swab32s(&h->hai_action); - lustre_swab_lu_fid(&h->hai_fid); - __swab64s(&h->hai_cookie); - __swab64s(&h->hai_extent_start); - __swab64s(&h->hai_extent_end); - __swab64s(&h->hai_gid); + __swab32s(&h->hai_len); + __swab32s(&h->hai_action); + lustre_swab_lu_fid(&h->hai_fid); + lustre_swab_lu_fid(&h->hai_dfid); + __swab64s(&h->hai_cookie); + __swab64s(&h->hai_extent.offset); + __swab64s(&h->hai_extent.length); + __swab64s(&h->hai_gid); } static void lustre_swab_hal(struct hsm_action_list *h) { - struct hsm_action_item *hai; - int i; + struct hsm_action_item *hai; + int i; + + __swab32s(&h->hal_version); + __swab32s(&h->hal_count); + __swab32s(&h->hal_archive_id); + __swab64s(&h->hal_flags); + hai = hai_zero(h); + for (i = 0; i < h->hal_count; i++) { + lustre_swab_hai(hai); + hai = hai_next(hai); + } +} - __swab32s(&h->hal_version); - __swab32s(&h->hal_count); - __swab32s(&h->hal_archive_num); - hai = hai_zero(h); - for (i = 0; i < h->hal_count; i++) { - lustre_swab_hai(hai); - hai = hai_next(hai); - } +static void lustre_swab_kuch(struct kuc_hdr *l) +{ + __swab16s(&l->kuc_magic); + /* __u8 l->kuc_transport */ + __swab16s(&l->kuc_msgtype); + __swab16s(&l->kuc_msglen); +} + +static int mdc_ioc_hsm_ct_start(struct obd_export *exp, + struct lustre_kernelcomm *lk) +{ + struct obd_import *imp = class_exp2cliimp(exp); + __u32 archive = lk->lk_data; + int rc = 0; + + if (lk->lk_group != KUC_GRP_HSM) { + CERROR("Bad copytool group %d\n", lk->lk_group); + return -EINVAL; + } + + CDEBUG(D_HSM, "CT start r%d w%d u%d g%d f%#x\n", lk->lk_rfd, lk->lk_wfd, + lk->lk_uid, lk->lk_group, lk->lk_flags); + + if (lk->lk_flags & LK_FLG_STOP) { + /* Unregister with the coordinator */ + rc = mdc_ioc_hsm_ct_unregister(imp); + } else { + rc = mdc_ioc_hsm_ct_register(imp, archive); + } + + return rc; } /** - * Send a message to any listening copytools, nonblocking - * @param val LNL message (lnl_hdr + hsm_action_list) + * Send a message to any listening copytools + * @param val KUC message (kuc_hdr + hsm_action_list) * @param len total length of message */ static int mdc_hsm_copytool_send(int len, void *val) { - struct lnl_hdr *lh = (struct lnl_hdr *)val; - struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1); - int rc; - ENTRY; - - if (len < sizeof(*lh) + sizeof(*hal)) { - CERROR("Short HSM message %d < %d\n", len, - (int) (sizeof(*lh) + sizeof(*hal))); - RETURN(-EPROTO); - } - if (lh->lnl_magic == __swab16(LNL_MAGIC)) { - lustre_swab_lnlh(lh); - lustre_swab_hal(hal); - } else if (lh->lnl_magic != LNL_MAGIC) { - CERROR("Bad magic %x!=%x\n", lh->lnl_magic, LNL_MAGIC); - RETURN(-EPROTO); - } + struct kuc_hdr *lh = (struct kuc_hdr *)val; + struct hsm_action_list *hal = (struct hsm_action_list *)(lh + 1); + int rc; + ENTRY; + + if (len < sizeof(*lh) + sizeof(*hal)) { + CERROR("Short HSM message %d < %d\n", len, + (int) (sizeof(*lh) + sizeof(*hal))); + RETURN(-EPROTO); + } + if (lh->kuc_magic == __swab16(KUC_MAGIC)) { + lustre_swab_kuch(lh); + lustre_swab_hal(hal); + } else if (lh->kuc_magic != KUC_MAGIC) { + CERROR("Bad magic %x!=%x\n", lh->kuc_magic, KUC_MAGIC); + RETURN(-EPROTO); + } + + CDEBUG(D_HSM, " Received message mg=%x t=%d m=%d l=%d actions=%d " + "on %s\n", + lh->kuc_magic, lh->kuc_transport, lh->kuc_msgtype, + lh->kuc_msglen, hal->hal_count, hal->hal_fsname); + + /* Broadcast to HSM listeners */ + rc = libcfs_kkuc_group_put(KUC_GRP_HSM, lh); + + RETURN(rc); +} - CDEBUG(D_IOCTL, " Received message mg=%x t=%d m=%d l=%d actions=%d\n", - lh->lnl_magic, lh->lnl_transport, lh->lnl_msgtype, - lh->lnl_msglen, hal->hal_count); +/** + * callback function passed to kuc for re-registering each HSM copytool + * running on MDC, after MDT shutdown/recovery. + * @param data archive id served by the copytool + * @param cb_arg callback argument (obd_import) + */ +static int mdc_hsm_ct_reregister(__u32 data, void *cb_arg) +{ + struct obd_import *imp = (struct obd_import *)cb_arg; + __u32 archive = data; + int rc; - /* Broadcast to HSM listeners */ - rc = libcfs_klnl_msg_put(0, LNL_GRP_HSM, lh); + CDEBUG(D_HA, "recover copytool registration to MDT (archive=%#x)\n", + archive); + rc = mdc_ioc_hsm_ct_register(imp, archive); - RETURN(rc); + /* ignore error if the copytool is already registered */ + return ((rc != 0) && (rc != -EEXIST)) ? rc : 0; } -int mdc_set_info_async(struct obd_export *exp, - obd_count keylen, void *key, - obd_count vallen, void *val, - struct ptlrpc_request_set *set) +/** + * Re-establish all kuc contexts with MDT + * after MDT shutdown/recovery. + */ +static int mdc_kuc_reregister(struct obd_import *imp) { - struct obd_import *imp = class_exp2cliimp(exp); - int rc = -EINVAL; - ENTRY; - - if (KEY_IS(KEY_INIT_RECOV)) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - spin_lock(&imp->imp_lock); - imp->imp_initial_recov = *(int *)val; - spin_unlock(&imp->imp_lock); - CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n", - exp->exp_obd->obd_name, imp->imp_initial_recov); - RETURN(0); - } - /* Turn off initial_recov after we try all backup servers once */ - if (KEY_IS(KEY_INIT_RECOV_BACKUP)) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - spin_lock(&imp->imp_lock); - imp->imp_initial_recov_bk = *(int *)val; - if (imp->imp_initial_recov_bk) - imp->imp_initial_recov = 1; - spin_unlock(&imp->imp_lock); - CDEBUG(D_HA, "%s: set imp_initial_recov_bk = %d\n", - exp->exp_obd->obd_name, imp->imp_initial_recov_bk); - RETURN(0); - } - if (KEY_IS(KEY_READ_ONLY)) { - if (vallen != sizeof(int)) - RETURN(-EINVAL); - - spin_lock(&imp->imp_lock); - if (*((int *)val)) { - imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY; - imp->imp_connect_data.ocd_connect_flags |= OBD_CONNECT_RDONLY; - } else { - imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY; - imp->imp_connect_data.ocd_connect_flags &= ~OBD_CONNECT_RDONLY; - } - spin_unlock(&imp->imp_lock); + /* re-register HSM agents */ + return libcfs_kkuc_group_foreach(KUC_GRP_HSM, mdc_hsm_ct_reregister, + (void *)imp); +} - rc = target_set_info_rpc(imp, MDS_SET_INFO, - keylen, key, vallen, val, set); +int mdc_set_info_async(const struct lu_env *env, + struct obd_export *exp, + obd_count keylen, void *key, + obd_count vallen, void *val, + struct ptlrpc_request_set *set) +{ + struct obd_import *imp = class_exp2cliimp(exp); + int rc; + ENTRY; + + if (KEY_IS(KEY_READ_ONLY)) { + if (vallen != sizeof(int)) + RETURN(-EINVAL); + + spin_lock(&imp->imp_lock); + if (*((int *)val)) { + imp->imp_connect_flags_orig |= OBD_CONNECT_RDONLY; + imp->imp_connect_data.ocd_connect_flags |= + OBD_CONNECT_RDONLY; + } else { + imp->imp_connect_flags_orig &= ~OBD_CONNECT_RDONLY; + imp->imp_connect_data.ocd_connect_flags &= + ~OBD_CONNECT_RDONLY; + } + spin_unlock(&imp->imp_lock); + + rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION, + keylen, key, vallen, val, set); RETURN(rc); } if (KEY_IS(KEY_SPTLRPC_CONF)) { @@ -1348,16 +2162,16 @@ int mdc_set_info_async(struct obd_export *exp, } if (KEY_IS(KEY_MDS_CONN)) { /* mds-mds import */ - spin_lock(&imp->imp_lock); - imp->imp_server_timeout = 1; - spin_unlock(&imp->imp_lock); + spin_lock(&imp->imp_lock); + imp->imp_server_timeout = 1; + spin_unlock(&imp->imp_lock); imp->imp_client->cli_request_portal = MDS_MDS_PORTAL; CDEBUG(D_OTHER, "%s: timeout / 2\n", exp->exp_obd->obd_name); RETURN(0); } if (KEY_IS(KEY_CHANGELOG_CLEAR)) { - rc = target_set_info_rpc(imp, MDS_SET_INFO, - keylen, key, vallen, val, set); + rc = do_set_info_async(imp, MDS_SET_INFO, LUSTRE_MDS_VERSION, + keylen, key, vallen, val, set); RETURN(rc); } if (KEY_IS(KEY_HSM_COPYTOOL_SEND)) { @@ -1365,11 +2179,13 @@ int mdc_set_info_async(struct obd_export *exp, RETURN(rc); } - RETURN(rc); + CERROR("Unknown key %s\n", (char *)key); + RETURN(-EINVAL); } -int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key, - __u32 *vallen, void *val, struct lov_stripe_md *lsm) +int mdc_get_info(const struct lu_env *env, struct obd_export *exp, + __u32 keylen, void *key, __u32 *vallen, void *val, + struct lov_stripe_md *lsm) { int rc = -EINVAL; @@ -1384,8 +2200,7 @@ int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key, max_easize = val; *max_easize = exp->exp_obd->u.cli.cl_max_mds_easize; RETURN(0); - } - if (KEY_IS(KEY_CONN_DATA)) { + } else if (KEY_IS(KEY_CONN_DATA)) { struct obd_import *imp = class_exp2cliimp(exp); struct obd_connect_data *data = val; @@ -1394,6 +2209,9 @@ int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key, *data = imp->imp_connect_data; RETURN(0); + } else if (KEY_IS(KEY_TGT_COUNT)) { + *((int *)val) = 1; + RETURN(0); } rc = mdc_get_info_rpc(exp, keylen, key, *vallen, val); @@ -1401,59 +2219,6 @@ int mdc_get_info(struct obd_export *exp, __u32 keylen, void *key, RETURN(rc); } -static int mdc_statfs(struct obd_device *obd, struct obd_statfs *osfs, - __u64 max_age, __u32 flags) -{ - struct ptlrpc_request *req; - struct obd_statfs *msfs; - struct obd_import *imp = NULL; - int rc; - ENTRY; - - - /*Since the request might also come from lprocfs, so we need - *sync this with client_disconnect_export Bug15684*/ - down_read(&obd->u.cli.cl_sem); - if (obd->u.cli.cl_import) - imp = class_import_get(obd->u.cli.cl_import); - up_read(&obd->u.cli.cl_sem); - if (!imp) - RETURN(-ENODEV); - - req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_STATFS, - LUSTRE_MDS_VERSION, MDS_STATFS); - if (req == NULL) - GOTO(output, rc = -ENOMEM); - - ptlrpc_request_set_replen(req); - - if (flags & OBD_STATFS_NODELAY) { - /* procfs requests not want stay in wait for avoid deadlock */ - req->rq_no_resend = 1; - req->rq_no_delay = 1; - } - - rc = ptlrpc_queue_wait(req); - if (rc) { - /* check connection error first */ - if (imp->imp_connect_error) - rc = imp->imp_connect_error; - GOTO(out, rc); - } - - msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS); - if (msfs == NULL) - GOTO(out, rc = -EPROTO); - - *osfs = *msfs; - EXIT; -out: - ptlrpc_req_finished(req); -output: - class_import_put(imp); - return rc; -} - static int mdc_pin(struct obd_export *exp, const struct lu_fid *fid, struct obd_capa *oc, struct obd_client_handle *handle, int flags) @@ -1593,12 +2358,9 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, /* * Flush current sequence to make client obtain new one * from server in case of disconnect/reconnect. - * If range is already empty then no need to flush it. */ - if (cli->cl_seq != NULL && - !range_is_exhausted(&cli->cl_seq->lcs_space)) { + if (cli->cl_seq != NULL) seq_client_flush(cli->cl_seq); - } rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL); break; @@ -1610,14 +2372,18 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, break; } - case IMP_EVENT_ACTIVE: { - rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); - break; - } + case IMP_EVENT_ACTIVE: + rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL); + /* redo the kuc registration after reconnecting */ + if (rc == 0) + rc = mdc_kuc_reregister(imp); + break; case IMP_EVENT_OCD: rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL); break; - + case IMP_EVENT_DEACTIVATE: + case IMP_EVENT_ACTIVATE: + break; default: CERROR("Unknown import event %x\n", event); LBUG(); @@ -1625,76 +2391,50 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp, RETURN(rc); } -static int mdc_fid_init(struct obd_export *exp) +int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, + struct md_op_data *op_data) { struct client_obd *cli = &exp->exp_obd->u.cli; - char *prefix; - int rc; + struct lu_client_seq *seq = cli->cl_seq; ENTRY; - - OBD_ALLOC_PTR(cli->cl_seq); - if (cli->cl_seq == NULL) - RETURN(-ENOMEM); - - OBD_ALLOC(prefix, MAX_OBD_NAME + 5); - if (prefix == NULL) - GOTO(out_free_seq, rc = -ENOMEM); - - snprintf(prefix, MAX_OBD_NAME + 5, "srv-%s", - exp->exp_obd->obd_name); - - /* Init client side sequence-manager */ - rc = seq_client_init(cli->cl_seq, exp, - LUSTRE_SEQ_METADATA, - prefix, NULL); - OBD_FREE(prefix, MAX_OBD_NAME + 5); - if (rc) - GOTO(out_free_seq, rc); - - RETURN(rc); -out_free_seq: - OBD_FREE_PTR(cli->cl_seq); - cli->cl_seq = NULL; - return rc; + RETURN(seq_client_alloc_fid(NULL, seq, fid)); } -static int mdc_fid_fini(struct obd_export *exp) -{ +struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { struct client_obd *cli = &exp->exp_obd->u.cli; - ENTRY; - - if (cli->cl_seq != NULL) { - seq_client_fini(cli->cl_seq); - OBD_FREE_PTR(cli->cl_seq); - cli->cl_seq = NULL; - } - - RETURN(0); + return &cli->cl_target_uuid; } -int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid, - struct md_op_data *op_data) +/** + * Determine whether the lock can be canceled before replaying it during + * recovery, non zero value will be return if the lock can be canceled, + * or zero returned for not + */ +static int mdc_cancel_for_recovery(struct ldlm_lock *lock) { - struct client_obd *cli = &exp->exp_obd->u.cli; - struct lu_client_seq *seq = cli->cl_seq; - ENTRY; - RETURN(seq_client_alloc_fid(seq, fid)); + if (lock->l_resource->lr_type != LDLM_IBITS) + RETURN(0); + + /* FIXME: if we ever get into a situation where there are too many + * opened files with open locks on a single node, then we really + * should replay these open locks to reget it */ + if (lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_OPEN) + RETURN(0); + + RETURN(1); } -/* XXX This method is used only to clear current fid seq - * once fld/mds insert failed */ -static int mdc_fid_delete(struct obd_export *exp, const struct lu_fid *fid) +static int mdc_resource_inode_free(struct ldlm_resource *res) { - struct client_obd *cli = &exp->exp_obd->u.cli; + if (res->lr_lvb_inode) + res->lr_lvb_inode = NULL; - seq_client_flush(cli->cl_seq); - return 0; + return 0; } -struct obd_uuid *mdc_get_uuid(struct obd_export *exp) { - struct client_obd *cli = &exp->exp_obd->u.cli; - return &cli->cl_target_uuid; -} +struct ldlm_valblock_ops inode_lvbo = { + lvbo_free: mdc_resource_inode_free +}; static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) { @@ -1710,14 +2450,9 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) ptlrpcd_addref(); - OBD_ALLOC(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock)); - if (!cli->cl_setattr_lock) - GOTO(err_rpc_lock, rc = -ENOMEM); - mdc_init_rpc_lock(cli->cl_setattr_lock); - OBD_ALLOC(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); if (!cli->cl_close_lock) - GOTO(err_setattr_lock, rc = -ENOMEM); + GOTO(err_rpc_lock, rc = -ENOMEM); mdc_init_rpc_lock(cli->cl_close_lock); rc = client_obd_setup(obd, cfg); @@ -1728,22 +2463,20 @@ static int mdc_setup(struct obd_device *obd, struct lustre_cfg *cfg) sptlrpc_lprocfs_cliobd_attach(obd); ptlrpc_lprocfs_register_obd(obd); + ns_register_cancel(obd->obd_namespace, mdc_cancel_for_recovery); + + obd->obd_namespace->ns_lvbo = &inode_lvbo; + rc = obd_llog_init(obd, &obd->obd_olg, obd, NULL); if (rc) { mdc_cleanup(obd); CERROR("failed to setup llogging subsystems\n"); } - /* ignore errors */ - libcfs_klnl_start(LNL_TRANSPORT_HSM); - libcfs_klnl_start(LNL_TRANSPORT_CHANGELOG); - RETURN(rc); err_close_lock: OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); -err_setattr_lock: - OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock)); err_rpc_lock: OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); ptlrpcd_decref(); @@ -1780,19 +2513,16 @@ static int mdc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) switch (stage) { case OBD_CLEANUP_EARLY: + break; case OBD_CLEANUP_EXPORTS: - /* If we set up but never connected, the - client import will not have been cleaned. */ - if (obd->u.cli.cl_import) { - struct obd_import *imp; - down_write(&obd->u.cli.cl_sem); - imp = obd->u.cli.cl_import; - CERROR("client import never connected\n"); - ptlrpc_invalidate_import(imp); - class_destroy_import(imp); - up_write(&obd->u.cli.cl_sem); - obd->u.cli.cl_import = NULL; - } + /* Failsafe, ok if racy */ + if (obd->obd_type->typ_refcnt <= 1) + libcfs_kkuc_group_rem(0, KUC_GRP_HSM); + + obd_cleanup_client_import(obd); + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_obd_cleanup(obd); + rc = obd_llog_finish(obd, 0); if (rc != 0) CERROR("failed to cleanup llogging subsystems\n"); @@ -1805,15 +2535,9 @@ static int mdc_cleanup(struct obd_device *obd) { struct client_obd *cli = &obd->u.cli; - libcfs_klnl_stop(LNL_TRANSPORT_HSM, LNL_GRP_HSM); - libcfs_klnl_stop(LNL_TRANSPORT_CHANGELOG, 0); - OBD_FREE(cli->cl_rpc_lock, sizeof (*cli->cl_rpc_lock)); - OBD_FREE(cli->cl_setattr_lock, sizeof (*cli->cl_setattr_lock)); OBD_FREE(cli->cl_close_lock, sizeof (*cli->cl_close_lock)); - ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); ptlrpcd_decref(); return client_obd_cleanup(obd); @@ -1823,47 +2547,36 @@ static int mdc_cleanup(struct obd_device *obd) static int mdc_llog_init(struct obd_device *obd, struct obd_llog_group *olg, struct obd_device *tgt, int *index) { - struct llog_ctxt *ctxt; - int rc; - ENTRY; + struct llog_ctxt *ctxt; + int rc; - LASSERT(olg == &obd->obd_olg); + ENTRY; - rc = llog_setup(obd, olg, LLOG_LOVEA_REPL_CTXT, tgt, 0, NULL, - &llog_client_ops); - if (rc) - RETURN(rc); + LASSERT(olg == &obd->obd_olg); - ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT); - llog_initiator_connect(ctxt); - llog_ctxt_put(ctxt); + rc = llog_setup(NULL, obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt, + &llog_client_ops); + if (rc) + RETURN(rc); - rc = llog_setup(obd, olg, LLOG_CHANGELOG_REPL_CTXT, tgt, 0, NULL, - &llog_client_ops); - if (rc == 0) { - ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT); - llog_initiator_connect(ctxt); - llog_ctxt_put(ctxt); - } + ctxt = llog_group_get_ctxt(olg, LLOG_CHANGELOG_REPL_CTXT); + llog_initiator_connect(ctxt); + llog_ctxt_put(ctxt); - RETURN(rc); + RETURN(0); } static int mdc_llog_finish(struct obd_device *obd, int count) { - struct llog_ctxt *ctxt; - int rc = 0; - ENTRY; + struct llog_ctxt *ctxt; - ctxt = llog_get_context(obd, LLOG_LOVEA_REPL_CTXT); - if (ctxt) - rc = llog_cleanup(ctxt); + ENTRY; - ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); - if (ctxt) - rc = llog_cleanup(ctxt); + ctxt = llog_get_context(obd, LLOG_CHANGELOG_REPL_CTXT); + if (ctxt) + llog_cleanup(NULL, ctxt); - RETURN(rc); + RETURN(0); } static int mdc_process_config(struct obd_device *obd, obd_count len, void *buf) @@ -1925,11 +2638,10 @@ int mdc_get_remote_perm(struct obd_export *exp, const struct lu_fid *fid, } static int mdc_interpret_renew_capa(const struct lu_env *env, - struct ptlrpc_request *req, void *unused, + struct ptlrpc_request *req, void *args, int status) { - struct obd_capa *oc = req->rq_async_args.pointer_arg[0]; - renew_capa_cb_t cb = req->rq_async_args.pointer_arg[1]; + struct mdc_renew_capa_args *ra = args; struct mdt_body *body = NULL; struct lustre_capa *capa; ENTRY; @@ -1949,7 +2661,7 @@ static int mdc_interpret_renew_capa(const struct lu_env *env, GOTO(out, capa = ERR_PTR(-EFAULT)); EXIT; out: - cb(oc, capa); + ra->ra_cb(ra->ra_oc, capa); return 0; } @@ -1957,6 +2669,7 @@ static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc, renew_capa_cb_t cb) { struct ptlrpc_request *req; + struct mdc_renew_capa_args *ra; ENTRY; req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_MDS_GETATTR, @@ -1970,10 +2683,12 @@ static int mdc_renew_capa(struct obd_export *exp, struct obd_capa *oc, mdc_pack_body(req, &oc->c_capa.lc_fid, oc, OBD_MD_FLOSSCAPA, 0, -1, 0); ptlrpc_request_set_replen(req); - req->rq_async_args.pointer_arg[0] = oc; - req->rq_async_args.pointer_arg[1] = cb; + CLASSERT(sizeof(*ra) <= sizeof(req->rq_async_args)); + ra = ptlrpc_req_async_args(req); + ra->ra_oc = oc; + ra->ra_cb = cb; req->rq_interpret_reply = mdc_interpret_renew_capa; - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); RETURN(0); } @@ -1983,13 +2698,13 @@ static int mdc_connect(const struct lu_env *env, struct obd_connect_data *data, void *localdata) { - struct obd_import *imp = obd->u.cli.cl_import; + struct obd_import *imp = obd->u.cli.cl_import; - /* mds-mds import features */ - if (data && (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) { - spin_lock(&imp->imp_lock); - imp->imp_server_timeout = 1; - spin_unlock(&imp->imp_lock); + /* mds-mds import features */ + if (data && (data->ocd_connect_flags & OBD_CONNECT_MDS_MDS)) { + spin_lock(&imp->imp_lock); + imp->imp_server_timeout = 1; + spin_unlock(&imp->imp_lock); imp->imp_client->cli_request_portal = MDS_MDS_PORTAL; CDEBUG(D_OTHER, "%s: Set 'mds' portal and timeout\n", obd->obd_name); @@ -2012,21 +2727,23 @@ struct obd_ops mdc_obd_ops = { .o_statfs = mdc_statfs, .o_pin = mdc_pin, .o_unpin = mdc_unpin, - .o_fid_init = mdc_fid_init, - .o_fid_fini = mdc_fid_fini, + .o_fid_init = client_fid_init, + .o_fid_fini = client_fid_fini, .o_fid_alloc = mdc_fid_alloc, - .o_fid_delete = mdc_fid_delete, .o_import_event = mdc_import_event, .o_llog_init = mdc_llog_init, .o_llog_finish = mdc_llog_finish, .o_get_info = mdc_get_info, .o_process_config = mdc_process_config, .o_get_uuid = mdc_get_uuid, + .o_quotactl = mdc_quotactl, + .o_quotacheck = mdc_quotacheck }; struct md_ops mdc_md_ops = { .m_getstatus = mdc_getstatus, - .m_change_cbdata = mdc_change_cbdata, + .m_null_inode = mdc_null_inode, + .m_find_cbdata = mdc_find_cbdata, .m_close = mdc_close, .m_create = mdc_create, .m_done_writing = mdc_done_writing, @@ -2064,24 +2781,14 @@ int __init mdc_init(void) struct lprocfs_static_vars lvars = { 0 }; lprocfs_mdc_init_vars(&lvars); - request_module("lquota"); - quota_interface = PORTAL_SYMBOL_GET(mdc_quota_interface); - init_obd_quota_ops(quota_interface, &mdc_obd_ops); - rc = class_register_type(&mdc_obd_ops, &mdc_md_ops, lvars.module_vars, LUSTRE_MDC_NAME, NULL); - if (rc && quota_interface) - PORTAL_SYMBOL_PUT(mdc_quota_interface); - RETURN(rc); } #ifdef __KERNEL__ static void /*__exit*/ mdc_exit(void) { - if (quota_interface) - PORTAL_SYMBOL_PUT(mdc_quota_interface); - class_unregister_type(LUSTRE_MDC_NAME); }