/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * lustre/mdt/mdt_handler.c
- * Lustre Metadata Target (mdt) request handler
+ * GPL HEADER START
*
- * Copyright (c) 2006 Cluster File Systems, Inc.
- * Author: Peter Braam <braam@clusterfs.com>
- * Author: Andreas Dilger <adilger@clusterfs.com>
- * Author: Phil Schwan <phil@clusterfs.com>
- * Author: Mike Shaver <shaver@clusterfs.com>
- * Author: Nikita Danilov <nikita@clusterfs.com>
- * Author: Huang Hua <huanghua@clusterfs.com>
- * Author: Yury Umanets <umka@clusterfs.com>
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * This file is part of the Lustre file system, http://www.lustre.org
- * Lustre is a trademark of Cluster File Systems, Inc.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * You may have signed or agreed to another license before downloading
- * this software. If so, you are bound by the terms and conditions
- * of that agreement, and the following does not apply to you. See the
- * LICENSE file included with this distribution for more information.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * If you did not agree to a different license, then this copy of Lustre
- * is open source software; you can redistribute it and/or modify it
- * under the terms of version 2 of the GNU General Public License as
- * published by the Free Software Foundation.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
*
- * In either case, Lustre is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * license text for more details.
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * lustre/mdt/mdt_handler.c
+ *
+ * Lustre Metadata Target (mdt) request handler
+ *
+ * Author: Peter Braam <braam@clusterfs.com>
+ * Author: Andreas Dilger <adilger@clusterfs.com>
+ * Author: Phil Schwan <phil@clusterfs.com>
+ * Author: Mike Shaver <shaver@clusterfs.com>
+ * Author: Nikita Danilov <nikita@clusterfs.com>
+ * Author: Huang Hua <huanghua@clusterfs.com>
+ * Author: Yury Umanets <umka@clusterfs.com>
*/
#ifndef EXPORT_SYMTAB
#include <lustre_mds.h>
#include <lustre_mdt.h>
#include "mdt_internal.h"
-#include <linux/lustre_acl.h>
+#include <lustre_acl.h>
#include <lustre_param.h>
mdl_mode_t mdt_mdl_lock_modes[] = {
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
RETURN(err_serious(-ENOMEM));
- repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
rc = next->md_ops->mdo_root_get(info->mti_env, next, &repbody->fid1);
if (rc != 0)
RETURN(rc);
if (IS_ERR(root))
RETURN(PTR_ERR(root));
- capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+ capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
LASSERT(capa);
capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
static int mdt_statfs(struct mdt_thread_info *info)
{
- struct md_device *next = info->mti_mdt->mdt_child;
- struct obd_statfs *osfs;
- int rc;
+ struct md_device *next = info->mti_mdt->mdt_child;
+ struct ptlrpc_service *svc;
+ struct obd_statfs *osfs;
+ int rc;
ENTRY;
+ svc = info->mti_pill->rc_req->rq_rqbd->rqbd_service;
+
/* This will trigger a watchdog timeout */
OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
- (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
+ (MDT_SERVICE_WATCHDOG_FACTOR *
+ at_get(&svc->srv_at_estimate) / 1000) + 1);
rc = mdt_check_ucred(info);
if (rc)
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
rc = err_serious(-ENOMEM);
} else {
- osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
+ osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
rc = next->md_ops->mdo_statfs(info->mti_env, next,
&info->mti_u.ksfs);
statfs_pack(osfs, &info->mti_u.ksfs);
{
struct mdt_body *b;
struct lu_attr *attr = &info->mti_attr.ma_attr;
-
- b = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+
+ b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
/* Check if Size-on-MDS is enabled. */
if ((mdt_conn_flags(info) & OBD_CONNECT_SOM) &&
if (fid) {
b->fid1 = *fid;
b->valid |= OBD_MD_FLID;
- CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
+
+ /* FIXME: these should be fixed when new igif ready.*/
+ b->ino = fid_oid(fid); /* 1.6 compatibility */
+ b->generation = fid_ver(fid); /* 1.6 compatibility */
+ b->valid |= OBD_MD_FLGENER; /* 1.6 compatibility */
+
+ CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, size="LPU64"\n",
PFID(fid), b->nlink, b->mode, b->size);
}
struct mdt_export_data *med = &req->rq_export->exp_mdt_data;
struct md_attr *ma = &info->mti_attr;
struct lu_attr *la = &ma->ma_attr;
- struct req_capsule *pill = &info->mti_pill;
+ struct req_capsule *pill = info->mti_pill;
const struct lu_env *env = info->mti_env;
struct mdt_body *repbody;
struct lu_buf *buffer = &info->mti_buf;
ma->ma_need = MA_LOV | MA_INODE;
}
+ if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
+ reqbody->valid & OBD_MD_FLDIREA &&
+ lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
+ /* get default stripe info for this dir. */
+ ma->ma_need |= MA_LOV_DEF;
+ }
rc = mo_attr_get(env, next, ma);
if (unlikely(rc)) {
CERROR("getattr error for "DFID": %d\n",
info->mti_mdt->mdt_opts.mo_mds_capa) {
struct lustre_capa *capa;
- capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+ capa = req_capsule_server_get(pill, &RMF_CAPA1);
LASSERT(capa);
capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
rc = mo_capa_get(env, next, capa, 0);
if (!obj || !mdt->mdt_opts.mo_mds_capa)
RETURN(0);
- body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(body != NULL);
- c = req_capsule_client_get(&info->mti_pill, &RMF_CAPA1);
+ c = req_capsule_client_get(info->mti_pill, &RMF_CAPA1);
LASSERT(c);
- capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
+ capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
LASSERT(capa);
*capa = *c;
rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa, 1);
if (rc == 0)
body->valid |= OBD_MD_FLOSSCAPA;
-
RETURN(rc);
}
static int mdt_getattr(struct mdt_thread_info *info)
{
struct mdt_object *obj = info->mti_object;
- struct req_capsule *pill = &info->mti_pill;
+ struct req_capsule *pill = info->mti_pill;
struct mdt_body *reqbody;
struct mdt_body *repbody;
mode_t mode;
LASSERT(reqbody);
if (reqbody->valid & OBD_MD_FLOSSCAPA) {
- rc = req_capsule_pack(pill);
+ rc = req_capsule_server_pack(pill);
if (unlikely(rc))
- rc = err_serious(rc);
- else {
- rc = mdt_renew_capa(info);
- mdt_shrink_reply(info);
- }
- GOTO(out, rc);
+ RETURN(err_serious(rc));
+ rc = mdt_renew_capa(info);
+ GOTO(out_shrink, rc);
}
LASSERT(obj != NULL);
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, md_size);
- rc = req_capsule_pack(pill);
+ rc = req_capsule_server_pack(pill);
if (unlikely(rc != 0))
- GOTO(out, rc = err_serious(rc));
+ RETURN(err_serious(rc));
repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
LASSERT(repbody != NULL);
EXIT;
out_shrink:
mdt_shrink_reply(info);
-out:
return rc;
}
static int mdt_is_subdir(struct mdt_thread_info *info)
{
struct mdt_object *o = info->mti_object;
- struct req_capsule *pill = &info->mti_pill;
+ struct req_capsule *pill = info->mti_pill;
const struct mdt_body *body = info->mti_body;
struct mdt_body *repbody;
int rc;
} else
mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
- repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
#endif
if (rc == 0) {
- repbody = req_capsule_server_get(&info->mti_pill,
- &RMF_MDT_BODY);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
repbody->fid1 = *child_fid;
repbody->valid = OBD_MD_FLID;
}
lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
LASSERT(parent != NULL);
- name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
+ name = req_capsule_client_get(info->mti_pill, &RMF_NAME);
if (name == NULL)
RETURN(err_serious(-EFAULT));
- namelen = req_capsule_get_size(&info->mti_pill, &RMF_NAME,
+ namelen = req_capsule_get_size(info->mti_pill, &RMF_NAME,
RCL_CLIENT) - 1;
LASSERT(namelen >= 0);
* otherwise do not allow empty name, that is the name must contain
* at least one character and the terminating '\0'*/
if (namelen == 0) {
- reqbody =req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
+ reqbody =req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(fid_is_sane(&reqbody->fid2));
name = NULL;
if (unlikely(rc != 0))
mdt_object_unlock(info, child, lhc, 1);
}
- GOTO(out, rc);
+ RETURN(rc);
}
/* step 1: lock parent */
LDLM_LOCK_PUT(lock);
rc = 0;
} else {
+ struct md_attr *ma;
relock:
+ ma = &info->mti_attr;
+
mdt_lock_handle_init(lhc);
mdt_lock_reg_init(lhc, LCK_PR);
LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
&child->mot_obj.mo_lu,
"Object doesn't exist!\n");
+ GOTO(out_child, rc = -ESTALE);
}
+
+ ma->ma_valid = 0;
+ ma->ma_need = MA_INODE;
+ rc = mo_attr_get(info->mti_env, next, ma);
+ if (unlikely(rc != 0))
+ GOTO(out_child, rc);
+
+ /* If the file has not been changed for some time, we return
+ * not only a LOOKUP lock, but also an UPDATE lock and this
+ * might save us RPC on later STAT. For directories, it also
+ * let negative dentry starts working for this dir. */
+ if (ma->ma_valid & MA_INODE &&
+ ma->ma_attr.la_valid & LA_CTIME &&
+ info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
+ ma->ma_attr.la_ctime < cfs_time_current_sec())
+ child_bits |= MDS_INODELOCK_UPDATE;
+
rc = mdt_object_lock(info, child, lhc, child_bits,
MDT_CROSS_LOCK);
lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
if (lock) {
struct mdt_body *repbody;
- struct lu_attr *ma;
/* Debugging code. */
res_id = &lock->l_resource->lr_name;
* Pack Size-on-MDS inode attributes to the body if
* update lock is given.
*/
- repbody = req_capsule_server_get(&info->mti_pill,
+ repbody = req_capsule_server_get(info->mti_pill,
&RMF_MDT_BODY);
- ma = &info->mti_attr.ma_attr;
if (lock->l_policy_data.l_inodebits.bits &
MDS_INODELOCK_UPDATE)
mdt_pack_size2body(info, child);
mdt_object_put(info->mti_env, child);
out_parent:
mdt_object_unlock(info, parent, lhp, 1);
-out:
return rc;
}
int rc;
ENTRY;
- reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
+ reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(reqbody != NULL);
- repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(repbody != NULL);
info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
rc = mdt_init_ucred(info, reqbody);
if (unlikely(rc))
- GOTO(out, rc);
+ GOTO(out_shrink, rc);
rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
}
mdt_exit_ucred(info);
EXIT;
-out:
+out_shrink:
mdt_shrink_reply(info);
return rc;
}
int keylen, rc = 0;
ENTRY;
- rc = lustre_pack_reply(req, 1, NULL, NULL);
+ rc = req_capsule_server_pack(info->mti_pill);
if (rc)
RETURN(rc);
- key = req_capsule_client_get(&info->mti_pill, &RMF_SETINFO_KEY);
+ key = req_capsule_client_get(info->mti_pill, &RMF_SETINFO_KEY);
if (key == NULL) {
DEBUG_REQ(D_HA, req, "no set_info key");
RETURN(-EFAULT);
}
- keylen = req_capsule_get_size(&info->mti_pill, &RMF_SETINFO_KEY,
+ keylen = req_capsule_get_size(info->mti_pill, &RMF_SETINFO_KEY,
RCL_CLIENT);
- val = req_capsule_client_get(&info->mti_pill, &RMF_SETINFO_VAL);
+ val = req_capsule_client_get(info->mti_pill, &RMF_SETINFO_VAL);
if (val == NULL) {
DEBUG_REQ(D_HA, req, "no set_info val");
RETURN(-EFAULT);
}
- if (keylen != (sizeof(KEY_READ_ONLY) - 1) ||
- memcmp(key, KEY_READ_ONLY, keylen) != 0)
+ if (!KEY_IS(KEY_READ_ONLY))
RETURN(-EINVAL);
req->rq_status = 0;
struct l_wait_info *lwi = &info->mti_u.rdpg.mti_wait_info;
int tmpcount;
int tmpsize;
+ int timeout;
int i;
int rc;
ENTRY;
desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
MDS_BULK_PORTAL);
if (desc == NULL)
- GOTO(out, rc = -ENOMEM);
+ RETURN(-ENOMEM);
for (i = 0, tmpcount = rdpg->rp_count;
i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
GOTO(free_desc, rc);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
- GOTO(abort_bulk, rc);
+ GOTO(abort_bulk, rc = 0);
- *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
+ timeout = (int) req->rq_deadline - cfs_time_current_sec();
+ if (timeout < 0)
+ CERROR("Req deadline already passed %lu (now: %lu)\n",
+ req->rq_deadline, cfs_time_current_sec());
+ *lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);
rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
LASSERT (rc == 0 || rc == -ETIMEDOUT);
ptlrpc_abort_bulk(desc);
free_desc:
ptlrpc_free_bulk(desc);
-out:
return rc;
}
ma->ma_attr.la_valid = LA_MODE;
ma->ma_valid = MA_INODE;
- kmap(page);
+ cfs_kmap(page);
dp = page_address(page);
offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp);
}
EXIT;
out:
- kunmap(page);
+ cfs_kunmap(page);
return rc;
}
ENTRY;
- reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
+ reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
if (reqbody == NULL)
RETURN(err_serious(-EFAULT));
cleanup_lwi:
OBD_FREE_PTR(lwi);
cleanup_page:
- __cfs_free_page(page);
+ cfs_free_page(page);
desc_cleanup:
ptlrpc_free_bulk(desc);
RETURN(rc);
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
RETURN(err_serious(-ENOMEM));
- reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
- repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
if (reqbody == NULL || repbody == NULL)
RETURN(err_serious(-EFAULT));
- rc = mdt_check_ucred(info);
- if (rc)
- RETURN(err_serious(rc));
-
/*
* prepare @rdpg before calling lower layers and transfer itself. Here
* reqbody->size contains offset of where to start to read and
for (i = 0; i < rdpg->rp_npages; i++)
if (rdpg->rp_pages[i] != NULL)
- __cfs_free_page(rdpg->rp_pages[i]);
+ cfs_free_page(rdpg->rp_pages[i]);
OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
- OBD_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
+ RETURN(0);
return rc;
}
struct mdt_lock_handle *lhc,
__u32 op)
{
- struct req_capsule *pill = &info->mti_pill;
+ struct req_capsule *pill = info->mti_pill;
struct mdt_device *mdt = info->mti_mdt;
struct mdt_body *repbody;
- int need_shrink = 0;
- int rc;
+ int rc = 0;
ENTRY;
/* pack reply */
- if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
+ if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
mdt->mdt_max_mdsize);
- need_shrink = 1;
- }
- if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) {
+ if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
mdt->mdt_max_cookiesize);
- need_shrink = 1;
- }
- rc = req_capsule_pack(pill);
+
+ rc = req_capsule_server_pack(pill);
if (rc != 0) {
CERROR("Can't pack response, rc %d\n", rc);
RETURN(err_serious(rc));
if (rc != 0)
GOTO(out_ucred, rc = err_serious(rc));
- need_shrink = 0;
if (mdt_check_resent(info, mdt_reconstruct, lhc)) {
rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
GOTO(out_ucred, rc);
}
-
rc = mdt_reint_rec(info, lhc);
EXIT;
out_ucred:
mdt_exit_ucred(info);
out_shrink:
- if (need_shrink)
- mdt_shrink_reply(info);
+ mdt_shrink_reply(info);
return rc;
}
static long mdt_reint_opcode(struct mdt_thread_info *info,
const struct req_format **fmt)
{
- __u32 *ptr;
+ struct mdt_rec_reint *rec;
long opc;
opc = err_serious(-EFAULT);
- ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
- if (ptr != NULL) {
- opc = *ptr;
+ rec = req_capsule_client_get(info->mti_pill, &RMF_REC_REINT);
+ if (rec != NULL) {
+ opc = rec->rr_opcode;
DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
if (opc < REINT_MAX && fmt[opc] != NULL)
- req_capsule_extend(&info->mti_pill, fmt[opc]);
+ req_capsule_extend(info->mti_pill, fmt[opc]);
else {
CERROR("Unsupported opc: %ld\n", opc);
opc = err_serious(opc);
int rc;
static const struct req_format *reint_fmts[REINT_MAX] = {
- [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
- [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
- [REINT_LINK] = &RQF_MDS_REINT_LINK,
- [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
- [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
- [REINT_OPEN] = &RQF_MDS_REINT_OPEN
+ [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
+ [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
+ [REINT_LINK] = &RQF_MDS_REINT_LINK,
+ [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
+ [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
+ [REINT_OPEN] = &RQF_MDS_REINT_OPEN,
+ [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR
};
ENTRY;
RETURN(rc);
}
-/* TODO these two methods not available now. */
-
/* this should sync the whole device */
-static int mdt_device_sync(struct mdt_thread_info *info)
+static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
{
- return 0;
+ struct dt_device *dt = mdt->mdt_bottom;
+ int rc;
+ ENTRY;
+
+ rc = dt->dd_ops->dt_sync(env, dt);
+ RETURN(rc);
}
/* this should sync this object */
static int mdt_object_sync(struct mdt_thread_info *info)
{
- return 0;
+ struct md_object *next;
+ int rc;
+ ENTRY;
+
+ if (!mdt_object_exists(info->mti_object)) {
+ CWARN("Non existing object "DFID"!\n",
+ PFID(mdt_object_fid(info->mti_object)));
+ RETURN(-ESTALE);
+ }
+ next = mdt_object_child(info->mti_object);
+ rc = mo_object_sync(info->mti_env, next);
+
+ RETURN(rc);
}
static int mdt_sync(struct mdt_thread_info *info)
{
- struct req_capsule *pill = &info->mti_pill;
+ struct req_capsule *pill = info->mti_pill;
struct mdt_body *body;
int rc;
ENTRY;
if (fid_seq(&body->fid1) == 0) {
/* sync the whole device */
- rc = req_capsule_pack(pill);
+ rc = req_capsule_server_pack(pill);
if (rc == 0)
- rc = mdt_device_sync(info);
+ rc = mdt_device_sync(info->mti_env, info->mti_mdt);
else
rc = err_serious(rc);
} else {
{
int rc;
ENTRY;
+
+ req_capsule_set(info->mti_pill, &RQF_OBD_PING);
+
rc = target_handle_ping(mdt_info_req(info));
if (rc < 0)
rc = err_serious(rc);
*/
LASSERT(info->mti_dlm_req != NULL);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) {
- info->mti_fail_id = OBD_FAIL_LDLM_ENQUEUE;
- return 0;
- }
-
req = mdt_info_req(info);
/*
sptlrpc_svc_ctx_invalidate(req);
}
+ OBD_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, obd_fail_val);
+
return rc;
}
struct mdt_object *m;
ENTRY;
+ CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f);
if (unlikely(IS_ERR(o)))
m = (struct mdt_object *)o;
RETURN(rc);
}
+static inline
+void mdt_save_lock(struct ptlrpc_request *req, struct lustre_handle *h,
+ ldlm_mode_t mode, int decref)
+{
+ ENTRY;
+
+ if (lustre_handle_is_used(h)) {
+ if (decref)
+ mdt_fid_unlock(h, mode);
+ else
+ ptlrpc_save_lock(req, h, mode);
+ h->cookie = 0ull;
+ }
+
+ EXIT;
+}
+
/*
* Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
* to save this lock in req. when transaction committed, req will be released,
struct ptlrpc_request *req = mdt_info_req(info);
ENTRY;
- if (lustre_handle_is_used(&lh->mlh_pdo_lh)) {
- /* Do not save PDO locks to request, just decref. */
- mdt_fid_unlock(&lh->mlh_pdo_lh,
- lh->mlh_pdo_mode);
- lh->mlh_pdo_lh.cookie = 0ull;
- }
-
- if (lustre_handle_is_used(&lh->mlh_reg_lh)) {
- if (decref) {
- mdt_fid_unlock(&lh->mlh_reg_lh,
- lh->mlh_reg_mode);
- } else {
- ptlrpc_save_lock(req, &lh->mlh_reg_lh,
- lh->mlh_reg_mode);
- }
- lh->mlh_reg_lh.cookie = 0ull;
- }
+ mdt_save_lock(req, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
+ mdt_save_lock(req, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
EXIT;
}
const struct lu_env *env;
struct req_capsule *pill;
int rc;
+ ENTRY;
env = info->mti_env;
- pill = &info->mti_pill;
+ pill = info->mti_pill;
body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
if (body == NULL)
- return -EFAULT;
+ RETURN(-EFAULT);
+
+ if (!(body->valid & OBD_MD_FLID))
+ RETURN(0);
if (!fid_is_sane(&body->fid1)) {
CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
- return -EINVAL;
+ RETURN(-EINVAL);
}
/*
} else
rc = PTR_ERR(obj);
- return rc;
+ RETURN(rc);
}
static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
{
- struct req_capsule *pill;
+ struct req_capsule *pill = info->mti_pill;
int rc;
-
ENTRY;
- pill = &info->mti_pill;
if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
rc = mdt_body_unpack(info, flags);
req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
mdt->mdt_max_cookiesize);
- rc = req_capsule_pack(pill);
+ rc = req_capsule_server_pack(pill);
}
RETURN(rc);
}
LASSERT(current->journal_info == NULL);
/*
- * Mask out OBD_FAIL_ONCE, because that will stop
- * correct handling of failed req later in ldlm due to doing
- * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
- * correct actions like it is done in target_send_reply_msg().
+ * Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
+ * to put same checks into handlers like mdt_close(), mdt_reint(),
+ * etc., without talking to mdt authors first. Checking same thing
+ * there again is useless and returning 0 error wihtout packing reply
+ * is buggy! Handlers either pack reply or return error.
+ *
+ * We return 0 here and do not send any reply in order to emulate
+ * network failure. Do not send any reply in case any of NET related
+ * fail_id has occured.
*/
- if (h->mh_fail_id != 0) {
- /*
- * Set to info->mti_fail_id to handler fail_id, it will be used
- * later, and better than use default fail_id.
- */
- if (OBD_FAIL_CHECK(h->mh_fail_id && OBD_FAIL_MASK_LOC)) {
- info->mti_fail_id = h->mh_fail_id;
- RETURN(0);
- }
- }
+ if (OBD_FAIL_CHECK_ORSET(h->mh_fail_id, OBD_FAIL_ONCE))
+ RETURN(0);
rc = 0;
flags = h->mh_flags;
LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
if (h->mh_fmt != NULL) {
- req_capsule_set(&info->mti_pill, h->mh_fmt);
+ req_capsule_set(info->mti_pill, h->mh_fmt);
rc = mdt_unpack_req_pack_rep(info, flags);
}
LASSERT(h->mh_fmt != NULL);
- dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
+ dlm_req = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
if (dlm_req != NULL) {
if (info->mti_mdt->mdt_opts.mo_compat_resname)
rc = mdt_lock_resname_compat(info->mti_mdt,
dlm_req);
info->mti_dlm_req = dlm_req;
} else {
- CERROR("Can't unpack dlm request\n");
rc = -EFAULT;
}
}
* only
*/
rc = h->mh_act(info);
+ if (rc == 0 &&
+ !req->rq_no_reply && req->rq_reply_state == NULL) {
+ DEBUG_REQ(D_ERROR, req, "MDT \"handler\" %s did not "
+ "pack reply and returned 0 error\n",
+ h->mh_name);
+ LBUG();
+ }
serious = is_serious(rc);
rc = clear_serious(rc);
} else
info->mti_mdt->mdt_opts.mo_compat_resname) {
struct ldlm_reply *dlmrep;
- dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ dlmrep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
if (dlmrep != NULL)
rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
}
LBUG();
}
- RETURN(rc);
+ target_send_reply(req, rc, info->mti_fail_id);
+ RETURN(0);
}
void mdt_lock_handle_init(struct mdt_lock_handle *lh)
int i;
struct md_capainfo *ci;
- info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
- for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
- info->mti_rep_buf_size[i] = -1;
- req_capsule_init(&info->mti_pill, req, RCL_SERVER,
- info->mti_rep_buf_size);
+ req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+ info->mti_pill = &req->rq_pill;
/* lock handle */
for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
{
int i;
- req_capsule_fini(&info->mti_pill);
+ req_capsule_fini(info->mti_pill);
if (info->mti_object != NULL) {
mdt_object_put(info->mti_env, info->mti_object);
info->mti_object = NULL;
info->mti_env = NULL;
}
-/* mds/handler.c */
-extern int mds_filter_recovery_request(struct ptlrpc_request *req,
- struct obd_device *obd, int *process);
+static int mdt_filter_recovery_request(struct ptlrpc_request *req,
+ struct obd_device *obd, int *process)
+{
+ switch (lustre_msg_get_opc(req->rq_reqmsg)) {
+ case MDS_CONNECT: /* This will never get here, but for completeness. */
+ case OST_CONNECT: /* This will never get here, but for completeness. */
+ case MDS_DISCONNECT:
+ case OST_DISCONNECT:
+ *process = 1;
+ RETURN(0);
+
+ case MDS_CLOSE:
+ case MDS_DONE_WRITING:
+ case MDS_SYNC: /* used in unmounting */
+ case OBD_PING:
+ case MDS_REINT:
+ case SEQ_QUERY:
+ case FLD_QUERY:
+ case LDLM_ENQUEUE:
+ *process = target_queue_recovery_request(req, obd);
+ RETURN(0);
+
+ default:
+ DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
+ *process = -EAGAIN;
+ RETURN(0);
+ }
+}
+
/*
* Handle recovery. Return:
* +1: continue request processing;
int rc;
int should_process;
DEBUG_REQ(D_INFO, req, "Got new replay");
- rc = mds_filter_recovery_request(req, obd, &should_process);
+ rc = mdt_filter_recovery_request(req, obd, &should_process);
if (rc != 0 || !should_process)
RETURN(rc);
else if (should_process < 0) {
RETURN(+1);
}
-static int mdt_reply(struct ptlrpc_request *req, int rc,
- struct mdt_thread_info *info)
+static int mdt_msg_check_version(struct lustre_msg *msg)
{
- ENTRY;
+ int rc;
-#if 0
- if (req->rq_reply_state == NULL && rc == 0) {
- req->rq_status = rc;
- lustre_pack_reply(req, 1, NULL, NULL);
+ switch (lustre_msg_get_opc(msg)) {
+ case MDS_CONNECT:
+ case MDS_DISCONNECT:
+ case OBD_PING:
+ case SEC_CTX_INIT:
+ case SEC_CTX_INIT_CONT:
+ case SEC_CTX_FINI:
+ rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ lustre_msg_get_opc(msg),
+ lustre_msg_get_version(msg),
+ LUSTRE_OBD_VERSION);
+ break;
+ case MDS_GETSTATUS:
+ case MDS_GETATTR:
+ case MDS_GETATTR_NAME:
+ case MDS_STATFS:
+ case MDS_READPAGE:
+ case MDS_WRITEPAGE:
+ case MDS_IS_SUBDIR:
+ case MDS_REINT:
+ case MDS_CLOSE:
+ case MDS_DONE_WRITING:
+ case MDS_PIN:
+ case MDS_SYNC:
+ case MDS_GETXATTR:
+ case MDS_SETXATTR:
+ case MDS_SET_INFO:
+ case MDS_QUOTACHECK:
+ case MDS_QUOTACTL:
+ case QUOTA_DQACQ:
+ case QUOTA_DQREL:
+ case SEQ_QUERY:
+ case FLD_QUERY:
+ rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ lustre_msg_get_opc(msg),
+ lustre_msg_get_version(msg),
+ LUSTRE_MDS_VERSION);
+ break;
+ case LDLM_ENQUEUE:
+ case LDLM_CONVERT:
+ case LDLM_BL_CALLBACK:
+ case LDLM_CP_CALLBACK:
+ rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ lustre_msg_get_opc(msg),
+ lustre_msg_get_version(msg),
+ LUSTRE_DLM_VERSION);
+ break;
+ case OBD_LOG_CANCEL:
+ case LLOG_ORIGIN_HANDLE_CREATE:
+ case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
+ case LLOG_ORIGIN_HANDLE_READ_HEADER:
+ case LLOG_ORIGIN_HANDLE_CLOSE:
+ case LLOG_ORIGIN_HANDLE_DESTROY:
+ case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
+ case LLOG_CATINFO:
+ rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
+ if (rc)
+ CERROR("bad opc %u version %08x, expecting %08x\n",
+ lustre_msg_get_opc(msg),
+ lustre_msg_get_version(msg),
+ LUSTRE_LOG_VERSION);
+ break;
+ default:
+ CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg));
+ rc = -ENOTSUPP;
}
-#endif
- target_send_reply(req, rc, info->mti_fail_id);
- RETURN(0);
+ return rc;
}
-/* mds/handler.c */
-extern int mds_msg_check_version(struct lustre_msg *msg);
-
static int mdt_handle0(struct ptlrpc_request *req,
struct mdt_thread_info *info,
struct mdt_opc_slice *supported)
ENTRY;
- OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
+ if (OBD_FAIL_CHECK_ORSET(OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_ONCE))
+ RETURN(0);
LASSERT(current->journal_info == NULL);
msg = req->rq_reqmsg;
- rc = mds_msg_check_version(msg);
+ rc = mdt_msg_check_version(msg);
if (likely(rc == 0)) {
rc = mdt_recovery(info);
if (likely(rc == +1)) {
supported);
if (likely(h != NULL)) {
rc = mdt_req_handle(info, h, req);
- rc = mdt_reply(req, rc, info);
} else {
CERROR("The unsupported opc: 0x%x\n", lustre_msg_get_opc(msg) );
req->rq_status = -ENOTSUPP;
RETURN(ELDLM_LOCK_REPLACED);
}
- /* This lock might already be given to the client by an resent req,
- * in this case we should return ELDLM_LOCK_ABORTED,
- * so we should check led_held_locks here, but it will affect
- * performance, FIXME
+ /*
+ * Fixup the lock to be given to the client.
*/
- /* Fixup the lock to be given to the client */
lock_res_and_lock(new_lock);
new_lock->l_readers = 0;
new_lock->l_writers = 0;
new_lock->l_export = class_export_get(req->rq_export);
- spin_lock(&req->rq_export->exp_ldlm_data.led_lock);
- list_add(&new_lock->l_export_chain,
- &new_lock->l_export->exp_ldlm_data.led_held_locks);
- spin_unlock(&req->rq_export->exp_ldlm_data.led_lock);
-
new_lock->l_blocking_ast = lock->l_blocking_ast;
new_lock->l_completion_ast = lock->l_completion_ast;
new_lock->l_remote_handle = lock->l_remote_handle;
new_lock->l_flags &= ~LDLM_FL_LOCAL;
+ lustre_hash_add(new_lock->l_export->exp_lock_hash,
+ &new_lock->l_remote_handle,
+ &new_lock->l_exp_hash);
+
unlock_res_and_lock(new_lock);
LDLM_LOCK_PUT(new_lock);
lh->mlh_reg_lh.cookie = 0;
struct obd_export *exp = req->rq_export;
struct lustre_handle remote_hdl;
struct ldlm_request *dlmreq;
- struct list_head *iter;
+ struct ldlm_lock *lock;
if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
return;
- dlmreq = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
+ dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
remote_hdl = dlmreq->lock_handle[0];
- spin_lock(&exp->exp_ldlm_data.led_lock);
- list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
- struct ldlm_lock *lock;
- lock = list_entry(iter, struct ldlm_lock, l_export_chain);
- if (lock == new_lock)
- continue;
- if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+ lock = lustre_hash_lookup(exp->exp_lock_hash, &remote_hdl);
+ if (lock) {
+ if (lock != new_lock) {
lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
lh->mlh_reg_mode = lock->l_granted_mode;
- LDLM_DEBUG(lock, "restoring lock cookie");
+ LDLM_DEBUG(lock, "Restoring lock cookie");
DEBUG_REQ(D_DLMTRACE, req,
"restoring lock cookie "LPX64,
lh->mlh_reg_lh.cookie);
if (old_lock)
*old_lock = LDLM_LOCK_GET(lock);
- spin_unlock(&exp->exp_ldlm_data.led_lock);
+ lh_put(exp->exp_lock_hash, &lock->l_exp_hash);
return;
}
+
+ lh_put(exp->exp_lock_hash, &lock->l_exp_hash);
}
- spin_unlock(&exp->exp_ldlm_data.led_lock);
/*
* If the xid matches, then we know this is a resent request, and allow
int rc;
ENTRY;
- reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
+ reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(reqbody);
- repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+ repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
LASSERT(repbody);
info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
break;
default:
CERROR("Unhandled till now");
- GOTO(out, rc = -EINVAL);
+ GOTO(out_shrink, rc = -EINVAL);
}
rc = mdt_init_ucred(info, reqbody);
if (rc)
- GOTO(out, rc);
+ GOTO(out_shrink, rc);
- req = info->mti_pill.rc_req;
- ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ req = info->mti_pill->rc_req;
+ ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
/* Get lock from request for possible resent case. */
EXIT;
out_ucred:
mdt_exit_ucred(info);
-out:
+out_shrink:
mdt_shrink_reply(info);
return rc;
}
opc = mdt_reint_opcode(info, intent_fmts);
if (opc < 0)
- GOTO(out, rc = opc);
+ RETURN(opc);
if (mdt_it_flavor[opcode].it_reint != opc) {
CERROR("Reint code %ld doesn't match intent: %d\n",
opc, opcode);
- GOTO(out, rc = err_serious(-EPROTO));
+ RETURN(err_serious(-EPROTO));
}
/* Get lock from request for possible resent case. */
/* Check whether the reply has been packed successfully. */
if (mdt_info_req(info)->rq_repmsg != NULL)
- rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+ rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
if (rep == NULL)
- GOTO(out, rc = err_serious(-EFAULT));
+ RETURN(err_serious(-EFAULT));
/* MDC expects this in any case */
if (rc != 0)
LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
rep->lock_policy_res2 = 0;
rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
- GOTO(out, rc);
+ RETURN(rc);
}
rep->lock_policy_res2 = clear_serious(rc);
lhc->mlh_reg_lh.cookie = 0ull;
- rc = ELDLM_LOCK_ABORTED;
- EXIT;
-out:
- return rc;
+ if (rc == -ENOTCONN || rc == -ENODEV) {
+ /*
+ * If it is the disconnect error (ENODEV & ENOCONN), the error
+ * will be returned by rq_status, and client at ptlrpc layer
+ * will detect this, then disconnect, reconnect the import
+ * immediately, instead of impacting the following the rpc.
+ */
+ RETURN(rc);
+ } else {
+ /*
+ * For other cases, the error will be returned by intent.
+ * and client will retrieve the result from intent.
+ */
+ /*
+ * FIXME: when open lock is finished, that should be
+ * checked here.
+ */
+ RETURN(ELDLM_LOCK_ABORTED);
+ }
}
static int mdt_intent_code(long itcode)
if (opc < 0)
RETURN(-EINVAL);
- pill = &info->mti_pill;
+ pill = info->mti_pill;
flv = &mdt_it_flavor[opc];
if (flv->it_fmt != NULL)
struct ptlrpc_request *req = mdt_info_req(info);
if (flv->it_flags & MUTABOR &&
req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
- rc = -EROFS;
+ RETURN(-EROFS);
}
if (rc == 0 && flv->it_act != NULL) {
/* execute policy */
rc = flv->it_act(opc, info, lockp, flags);
- } else
+ } else {
rc = -EOPNOTSUPP;
+ }
RETURN(rc);
}
info = lu_context_key_get(&req->rq_svc_thread->t_env->le_ctx,
&mdt_thread_key);
LASSERT(info != NULL);
- pill = &info->mti_pill;
+ pill = info->mti_pill;
LASSERT(pill->rc_req == req);
if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
if (it != NULL) {
const struct ldlm_request *dlmreq;
__u64 req_bits;
-#if 0
- struct ldlm_lock *lock = *lockp;
-
- LDLM_DEBUG(lock, "intent policy opc: %s\n",
- ldlm_it2str(it->opc));
-#endif
rc = mdt_intent_opc(it->opc, info, lockp, flags);
if (rc == 0)
} else {
/* No intent was provided */
LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
- rc = req_capsule_pack(pill);
+ rc = req_capsule_server_pack(pill);
if (rc)
rc = err_serious(rc);
}
procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
conf = (typeof(conf)) {
- .psc_nbufs = MDS_NBUFS,
- .psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = MDS_MAXREQSIZE,
- .psc_max_reply_size = MDS_MAXREPSIZE,
- .psc_req_portal = MDS_REQUEST_PORTAL,
- .psc_rep_portal = MDC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = MDS_MAXREQSIZE,
+ .psc_max_reply_size = MDS_MAXREPSIZE,
+ .psc_req_portal = MDS_REQUEST_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
/*
* We'd like to have a mechanism to set this on a per-device
* basis, but alas...
*/
- .psc_min_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
- MDT_MAX_THREADS),
- .psc_max_threads = MDT_MAX_THREADS,
- .psc_ctx_tags = LCT_MD_THREAD
+ .psc_min_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
+ MDT_MAX_THREADS),
+ .psc_max_threads = MDT_MAX_THREADS,
+ .psc_ctx_tags = LCT_MD_THREAD
};
m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
m->mdt_regular_service =
ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
- procfs_entry, NULL, LUSTRE_MDT_NAME);
+ procfs_entry, target_print_req,
+ LUSTRE_MDT_NAME);
if (m->mdt_regular_service == NULL)
RETURN(-ENOMEM);
* ideally.
*/
conf = (typeof(conf)) {
- .psc_nbufs = MDS_NBUFS,
- .psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = MDS_MAXREQSIZE,
- .psc_max_reply_size = MDS_MAXREPSIZE,
- .psc_req_portal = MDS_READPAGE_PORTAL,
- .psc_rep_portal = MDC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
- .psc_min_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
- MDT_MAX_THREADS),
- .psc_max_threads = MDT_MAX_THREADS,
- .psc_ctx_tags = LCT_MD_THREAD
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = MDS_MAXREQSIZE,
+ .psc_max_reply_size = MDS_MAXREPSIZE,
+ .psc_req_portal = MDS_READPAGE_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+ .psc_min_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
+ MDT_MAX_THREADS),
+ .psc_max_threads = MDT_MAX_THREADS,
+ .psc_ctx_tags = LCT_MD_THREAD
};
m->mdt_readpage_service =
ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
LUSTRE_MDT_NAME "_readpage",
- procfs_entry, NULL, "mdt_rdpg");
+ procfs_entry, target_print_req,"mdt_rdpg");
if (m->mdt_readpage_service == NULL) {
CERROR("failed to start readpage service\n");
* setattr service configuration.
*/
conf = (typeof(conf)) {
- .psc_nbufs = MDS_NBUFS,
- .psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = MDS_MAXREQSIZE,
- .psc_max_reply_size = MDS_MAXREPSIZE,
- .psc_req_portal = MDS_SETATTR_PORTAL,
- .psc_rep_portal = MDC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = MDS_MAXREQSIZE,
+ .psc_max_reply_size = MDS_MAXREPSIZE,
+ .psc_req_portal = MDS_SETATTR_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
.psc_min_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
- MDT_MAX_THREADS),
- .psc_max_threads = MDT_MAX_THREADS,
- .psc_ctx_tags = LCT_MD_THREAD
+ MDT_MAX_THREADS),
+ .psc_max_threads = MDT_MAX_THREADS,
+ .psc_ctx_tags = LCT_MD_THREAD
};
m->mdt_setattr_service =
ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
LUSTRE_MDT_NAME "_setattr",
- procfs_entry, NULL, "mdt_attr");
+ procfs_entry, target_print_req,"mdt_attr");
if (!m->mdt_setattr_service) {
CERROR("failed to start setattr service\n");
* sequence controller service configuration
*/
conf = (typeof(conf)) {
- .psc_nbufs = MDS_NBUFS,
- .psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = SEQ_MAXREQSIZE,
- .psc_max_reply_size = SEQ_MAXREPSIZE,
- .psc_req_portal = SEQ_CONTROLLER_PORTAL,
- .psc_rep_portal = MDC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
- .psc_min_threads = SEQ_NUM_THREADS,
- .psc_max_threads = SEQ_NUM_THREADS,
- .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = SEQ_MAXREQSIZE,
+ .psc_max_reply_size = SEQ_MAXREPSIZE,
+ .psc_req_portal = SEQ_CONTROLLER_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+ .psc_min_threads = SEQ_NUM_THREADS,
+ .psc_max_threads = SEQ_NUM_THREADS,
+ .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
};
m->mdt_mdsc_service =
ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
LUSTRE_MDT_NAME"_mdsc",
- procfs_entry, NULL, "mdt_mdsc");
+ procfs_entry, target_print_req,"mdt_mdsc");
if (!m->mdt_mdsc_service) {
CERROR("failed to start seq controller service\n");
GOTO(err_mdt_svc, rc = -ENOMEM);
* metadata sequence server service configuration
*/
conf = (typeof(conf)) {
- .psc_nbufs = MDS_NBUFS,
- .psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = SEQ_MAXREQSIZE,
- .psc_max_reply_size = SEQ_MAXREPSIZE,
- .psc_req_portal = SEQ_METADATA_PORTAL,
- .psc_rep_portal = MDC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
- .psc_min_threads = SEQ_NUM_THREADS,
- .psc_max_threads = SEQ_NUM_THREADS,
- .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = SEQ_MAXREQSIZE,
+ .psc_max_reply_size = SEQ_MAXREPSIZE,
+ .psc_req_portal = SEQ_METADATA_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+ .psc_min_threads = SEQ_NUM_THREADS,
+ .psc_max_threads = SEQ_NUM_THREADS,
+ .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
};
m->mdt_mdss_service =
ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
LUSTRE_MDT_NAME"_mdss",
- procfs_entry, NULL, "mdt_mdss");
+ procfs_entry, target_print_req,"mdt_mdss");
if (!m->mdt_mdss_service) {
CERROR("failed to start metadata seq server service\n");
GOTO(err_mdt_svc, rc = -ENOMEM);
* controller which manages space.
*/
conf = (typeof(conf)) {
- .psc_nbufs = MDS_NBUFS,
- .psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = SEQ_MAXREQSIZE,
- .psc_max_reply_size = SEQ_MAXREPSIZE,
- .psc_req_portal = SEQ_DATA_PORTAL,
- .psc_rep_portal = OSC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
- .psc_min_threads = SEQ_NUM_THREADS,
- .psc_max_threads = SEQ_NUM_THREADS,
- .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = SEQ_MAXREQSIZE,
+ .psc_max_reply_size = SEQ_MAXREPSIZE,
+ .psc_req_portal = SEQ_DATA_PORTAL,
+ .psc_rep_portal = OSC_REPLY_PORTAL,
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+ .psc_min_threads = SEQ_NUM_THREADS,
+ .psc_max_threads = SEQ_NUM_THREADS,
+ .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
};
m->mdt_dtss_service =
ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
LUSTRE_MDT_NAME"_dtss",
- procfs_entry, NULL, "mdt_dtss");
+ procfs_entry, target_print_req,"mdt_dtss");
if (!m->mdt_dtss_service) {
CERROR("failed to start data seq server service\n");
GOTO(err_mdt_svc, rc = -ENOMEM);
/* FLD service start */
conf = (typeof(conf)) {
- .psc_nbufs = MDS_NBUFS,
- .psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = FLD_MAXREQSIZE,
- .psc_max_reply_size = FLD_MAXREPSIZE,
- .psc_req_portal = FLD_REQUEST_PORTAL,
- .psc_rep_portal = MDC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
- .psc_min_threads = FLD_NUM_THREADS,
- .psc_max_threads = FLD_NUM_THREADS,
- .psc_ctx_tags = LCT_DT_THREAD|LCT_MD_THREAD
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = FLD_MAXREQSIZE,
+ .psc_max_reply_size = FLD_MAXREPSIZE,
+ .psc_req_portal = FLD_REQUEST_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+ .psc_min_threads = FLD_NUM_THREADS,
+ .psc_max_threads = FLD_NUM_THREADS,
+ .psc_ctx_tags = LCT_DT_THREAD|LCT_MD_THREAD
};
m->mdt_fld_service =
ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
LUSTRE_MDT_NAME"_fld",
- procfs_entry, NULL, "mdt_fld");
+ procfs_entry, target_print_req, "mdt_fld");
if (!m->mdt_fld_service) {
CERROR("failed to start fld service\n");
GOTO(err_mdt_svc, rc = -ENOMEM);
* mds-mds requests be not blocked during recovery.
*/
conf = (typeof(conf)) {
- .psc_nbufs = MDS_NBUFS,
- .psc_bufsize = MDS_BUFSIZE,
- .psc_max_req_size = MDS_MAXREQSIZE,
- .psc_max_reply_size = MDS_MAXREPSIZE,
- .psc_req_portal = MDS_MDS_PORTAL,
- .psc_rep_portal = MDC_REPLY_PORTAL,
- .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
- .psc_min_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
- MDT_MAX_THREADS),
- .psc_max_threads = MDT_MAX_THREADS,
- .psc_ctx_tags = LCT_MD_THREAD
+ .psc_nbufs = MDS_NBUFS,
+ .psc_bufsize = MDS_BUFSIZE,
+ .psc_max_req_size = MDS_MAXREQSIZE,
+ .psc_max_reply_size = MDS_MAXREPSIZE,
+ .psc_req_portal = MDS_MDS_PORTAL,
+ .psc_rep_portal = MDC_REPLY_PORTAL,
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+ .psc_min_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
+ MDT_MAX_THREADS),
+ .psc_max_threads = MDT_MAX_THREADS,
+ .psc_ctx_tags = LCT_MD_THREAD
};
- m->mdt_xmds_service = ptlrpc_init_svc_conf(&conf, mdt_xmds_handle,
- LUSTRE_MDT_NAME "_mds",
- procfs_entry, NULL, "mdt_xmds");
+ m->mdt_xmds_service =
+ ptlrpc_init_svc_conf(&conf, mdt_xmds_handle,
+ LUSTRE_MDT_NAME "_mds",
+ procfs_entry, target_print_req,"mdt_xmds");
if (m->mdt_xmds_service == NULL) {
CERROR("failed to start readpage service\n");
static void mdt_stack_fini(const struct lu_env *env,
struct mdt_device *m, struct lu_device *top)
{
- struct lu_device *d = top, *n;
struct obd_device *obd = m->mdt_md_dev.md_lu_dev.ld_obd;
struct lustre_cfg_bufs *bufs;
struct lustre_cfg *lcfg;
top->ld_ops->ldo_process_config(env, top, lcfg);
lustre_cfg_free(lcfg);
- lu_site_purge(env, top->ld_site, ~0);
- while (d != NULL) {
- struct obd_type *type;
- struct lu_device_type *ldt = d->ld_type;
-
- /* each fini() returns next device in stack of layers
- * * so we can avoid the recursion */
- n = ldt->ldt_ops->ldto_device_fini(env, d);
- lu_device_put(d);
- ldt->ldt_ops->ldto_device_free(env, d);
- type = ldt->ldt_obd_type;
- type->typ_refcnt--;
- class_put_type(type);
-
- /* switch to the next device in the layer */
- d = n;
- }
+ lu_stack_fini(env, top);
m->mdt_child = NULL;
m->mdt_bottom = NULL;
}
ENTRY;
ping_evictor_stop();
-
+
target_recovery_fini(obd);
mdt_stop_ptlrpc_service(m);
mdt_fs_cleanup(env, m);
- upcall_cache_cleanup(m->mdt_rmtacl_cache);
- m->mdt_rmtacl_cache = NULL;
-
upcall_cache_cleanup(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
if (m->mdt_namespace != NULL) {
- ldlm_namespace_free(m->mdt_namespace, d->ld_obd->obd_force);
+ ldlm_namespace_free(m->mdt_namespace, NULL, d->ld_obd->obd_force);
d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
}
ptlrpc_lprocfs_unregister_obd(d->ld_obd);
lprocfs_obd_cleanup(d->ld_obd);
- if (m->mdt_rootsquash_info) {
- OBD_FREE_PTR(m->mdt_rootsquash_info);
- m->mdt_rootsquash_info = NULL;
- }
+ sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
next->md_ops->mdo_init_capa_ctxt(env, next, 0, 0, 0, NULL);
- del_timer(&m->mdt_ck_timer);
+ cfs_timer_disarm(&m->mdt_ck_timer);
mdt_ck_thread_stop(m);
/* finish the stack */
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
if (ls) {
- if (!list_empty(&ls->ls_lru) || ls->ls_total != 0) {
- /*
- * Uh-oh, objects still exist.
- */
- static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
-
- lu_site_print(env, ls, &cookie, lu_cdebug_printer);
- }
-
lu_site_fini(ls);
OBD_FREE_PTR(ls);
d->ld_site = NULL;
{
char *p = options;
+#ifdef CONFIG_FS_POSIX_ACL
+ /* ACLs should be enabled by default (b=13829) */
+ m->mdt_opts.mo_acl = 1;
+ LCONSOLE_INFO("Enabling ACL\n");
+#else
+ m->mdt_opts.mo_acl = 0;
+ LCONSOLE_INFO("Disabling ACL\n");
+#endif
+
if (!options)
return;
(memcmp(options, "nouser_xattr", len) == 0)) {
m->mdt_opts.mo_user_xattr = 0;
LCONSOLE_INFO("Disabling user_xattr\n");
- } else if ((len == sizeof("acl") - 1) &&
- (memcmp(options, "acl", len) == 0)) {
-#ifdef CONFIG_FS_POSIX_ACL
- m->mdt_opts.mo_acl = 1;
- LCONSOLE_INFO("Enabling ACL\n");
-#else
- m->mdt_opts.mo_acl = 0;
- CWARN("ignoring unsupported acl mount option\n");
- LCONSOLE_INFO("Disabling ACL\n");
-#endif
} else if ((len == sizeof("noacl") - 1) &&
(memcmp(options, "noacl", len) == 0)) {
m->mdt_opts.mo_acl = 0;
struct lustre_mount_info *lmi;
struct lustre_sb_info *lsi;
struct lu_site *s;
+ const char *identity_upcall = "NONE";
int rc;
ENTRY;
server_put_mount_2(dev, lmi->lmi_mnt);
}
+ m->mdt_sptlrpc_lock = RW_LOCK_UNLOCKED;
+ sptlrpc_rule_set_init(&m->mdt_sptlrpc_rset);
+
spin_lock_init(&m->mdt_ioepoch_lock);
m->mdt_opts.mo_compat_resname = 0;
m->mdt_capa_timeout = CAPA_TIMEOUT;
GOTO(err_free_site, rc);
}
- lprocfs_init_vars(mdt, &lvars);
+ lprocfs_mdt_init_vars(&lvars);
rc = lprocfs_obd_setup(obd, lvars.obd_vars);
if (rc) {
CERROR("Can't init lprocfs, rc %d\n", rc);
snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
LUSTRE_MDT_NAME"-%p", m);
- m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
+ m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
LDLM_NAMESPACE_SERVER,
LDLM_NAMESPACE_GREEDY);
if (m->mdt_namespace == NULL)
/* set obd_namespace for compatibility with old code */
obd->obd_namespace = m->mdt_namespace;
- m->mdt_identity_cache = upcall_cache_init(obd->obd_name,
- "NONE",
+ /* XXX: to support suppgid for ACL, we enable identity_upcall
+ * by default, otherwise, maybe got unexpected -EACCESS. */
+ if (m->mdt_opts.mo_acl)
+ identity_upcall = MDT_IDENTITY_UPCALL_PATH;
+
+ m->mdt_identity_cache = upcall_cache_init(obd->obd_name, identity_upcall,
&mdt_identity_upcall_cache_ops);
if (IS_ERR(m->mdt_identity_cache)) {
rc = PTR_ERR(m->mdt_identity_cache);
GOTO(err_free_ns, rc);
}
- m->mdt_rmtacl_cache = upcall_cache_init(obd->obd_name,
- MDT_RMTACL_UPCALL_PATH,
- &mdt_rmtacl_upcall_cache_ops);
- if (IS_ERR(m->mdt_rmtacl_cache)) {
- rc = PTR_ERR(m->mdt_rmtacl_cache);
- m->mdt_rmtacl_cache = NULL;
- GOTO(err_free_ns, rc);
- }
+ cfs_timer_init(&m->mdt_ck_timer, mdt_ck_timer_callback, m);
- m->mdt_ck_timer.function = mdt_ck_timer_callback;
- m->mdt_ck_timer.data = (unsigned long)m;
- init_timer(&m->mdt_ck_timer);
rc = mdt_ck_thread_start(m);
if (rc)
GOTO(err_free_ns, rc);
mdt_init_capa_ctxt(env, m);
+ /* Reduce the initial timeout on an MDS because it doesn't need such
+ * a long timeout as an OST does. Adaptive timeouts will adjust this
+ * value appropriately. */
if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
- ldlm_timeout = 6;
+ ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
RETURN(0);
target_recovery_fini(obd);
mdt_fs_cleanup(env, m);
err_capa:
- del_timer(&m->mdt_ck_timer);
+ cfs_timer_disarm(&m->mdt_ck_timer);
mdt_ck_thread_stop(m);
err_free_ns:
- upcall_cache_cleanup(m->mdt_rmtacl_cache);
- m->mdt_rmtacl_cache = NULL;
upcall_cache_cleanup(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
- ldlm_namespace_free(m->mdt_namespace, 0);
+ ldlm_namespace_free(m->mdt_namespace, NULL, 0);
obd->obd_namespace = m->mdt_namespace = NULL;
err_fini_seq:
mdt_seq_fini(env, m);
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
err_fini_proc:
mdt_procfs_fini(m);
+ ptlrpc_lprocfs_unregister_obd(obd);
lprocfs_obd_cleanup(obd);
err_fini_site:
lu_site_fini(s);
ENTRY;
switch (cfg->lcfg_command) {
+ case LCFG_SPTLRPC_CONF: {
+ struct sptlrpc_conf_log *log;
+ struct sptlrpc_rule_set tmp_rset;
+
+ log = sptlrpc_conf_log_extract(cfg);
+ if (IS_ERR(log)) {
+ rc = PTR_ERR(log);
+ break;
+ }
+
+ sptlrpc_rule_set_init(&tmp_rset);
+
+ rc = sptlrpc_rule_set_from_log(&tmp_rset, log);
+ if (rc) {
+ CERROR("mdt %p: failed get sptlrpc rules: %d\n", m, rc);
+ break;
+ }
+
+ write_lock(&m->mdt_sptlrpc_lock);
+ sptlrpc_rule_set_free(&m->mdt_sptlrpc_rset);
+ m->mdt_sptlrpc_rset = tmp_rset;
+ write_unlock(&m->mdt_sptlrpc_lock);
+
+ sptlrpc_target_update_exp_flavor(
+ md2lu_dev(&m->mdt_md_dev)->ld_obd, &tmp_rset);
+
+ break;
+ }
case LCFG_PARAM: {
struct lprocfs_static_vars lvars;
struct obd_device *obd = d->ld_obd;
- lprocfs_init_vars(mdt, &lvars);
+ lprocfs_mdt_init_vars(&lvars);
rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, cfg, obd);
if (rc)
/* others are passed further */
return -EBADE;
}
+ if ((exp->exp_connect_flags & OBD_CONNECT_FID) == 0) {
+ CWARN("%s: MDS requires FID support, but client not\n",
+ mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
+ return -EBADE;
+ }
+
return 0;
}
static int mdt_obd_connect(const struct lu_env *env,
struct lustre_handle *conn, struct obd_device *obd,
struct obd_uuid *cluuid,
- struct obd_connect_data *data)
+ struct obd_connect_data *data,
+ void *localdata)
{
- struct mdt_client_data *mcd;
+ struct mdt_thread_info *info;
+ struct lsd_client_data *lcd;
struct obd_export *exp;
struct mdt_device *mdt;
+ struct ptlrpc_request *req;
int rc;
ENTRY;
if (!conn || !obd || !cluuid)
RETURN(-EINVAL);
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ req = info->mti_pill->rc_req;
mdt = mdt_dev(obd->obd_lu_dev);
rc = class_connect(conn, obd, cluuid);
exp = class_conn2export(conn);
LASSERT(exp != NULL);
+ CDEBUG(D_SEC, "from %s\n", sptlrpc_part2name(req->rq_sp_from));
+
+ spin_lock(&exp->exp_lock);
+ exp->exp_sp_peer = req->rq_sp_from;
+
+ read_lock(&mdt->mdt_sptlrpc_lock);
+ sptlrpc_rule_set_choose(&mdt->mdt_sptlrpc_rset, exp->exp_sp_peer,
+ req->rq_peer.nid, &exp->exp_flvr);
+ read_unlock(&mdt->mdt_sptlrpc_lock);
+
+ if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
+ CERROR("invalid rpc flavor %x, expect %x, from %s\n",
+ req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
+ libcfs_nid2str(req->rq_peer.nid));
+ exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
+ spin_unlock(&exp->exp_lock);
+ RETURN(-EACCES);
+ }
+ spin_unlock(&exp->exp_lock);
+
rc = mdt_connect_internal(exp, mdt, data);
if (rc == 0) {
- OBD_ALLOC_PTR(mcd);
- if (mcd != NULL) {
+ OBD_ALLOC_PTR(lcd);
+ if (lcd != NULL) {
struct mdt_thread_info *mti;
mti = lu_context_key_get(&env->le_ctx,
&mdt_thread_key);
LASSERT(mti != NULL);
mti->mti_exp = exp;
- memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
- exp->exp_mdt_data.med_mcd = mcd;
+ memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
+ exp->exp_mdt_data.med_lcd = lcd;
rc = mdt_client_new(env, mdt);
if (rc != 0) {
- OBD_FREE_PTR(mcd);
- exp->exp_mdt_data.med_mcd = NULL;
+ OBD_FREE_PTR(lcd);
+ exp->exp_mdt_data.med_lcd = NULL;
}
} else
rc = -ENOMEM;
RETURN(rc);
}
-static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
+static int mdt_obd_reconnect(const struct lu_env *env,
+ struct obd_export *exp, struct obd_device *obd,
struct obd_uuid *cluuid,
struct obd_connect_data *data)
{
- int rc;
+ struct mdt_thread_info *info;
+ struct mdt_device *mdt;
+ struct ptlrpc_request *req;
+ int rc;
ENTRY;
if (exp == NULL || obd == NULL || cluuid == NULL)
RETURN(-EINVAL);
+ info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
+ req = info->mti_pill->rc_req;
+ mdt = mdt_dev(obd->obd_lu_dev);
+
+ CDEBUG(D_SEC, "from %s\n", sptlrpc_part2name(req->rq_sp_from));
+
+ spin_lock(&exp->exp_lock);
+ if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
+ exp->exp_sp_peer = req->rq_sp_from;
+
+ read_lock(&mdt->mdt_sptlrpc_lock);
+ sptlrpc_rule_set_choose(&mdt->mdt_sptlrpc_rset,
+ exp->exp_sp_peer,
+ req->rq_peer.nid, &exp->exp_flvr);
+ read_unlock(&mdt->mdt_sptlrpc_lock);
+
+ if (exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
+ CERROR("invalid rpc flavor %x, expect %x, from %s\n",
+ req->rq_flvr.sf_rpc, exp->exp_flvr.sf_rpc,
+ libcfs_nid2str(req->rq_peer.nid));
+ exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
+ spin_unlock(&exp->exp_lock);
+ RETURN(-EACCES);
+ }
+ }
+ spin_unlock(&exp->exp_lock);
+
rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
RETURN(rc);
static int mdt_init_export(struct obd_export *exp)
{
struct mdt_export_data *med = &exp->exp_mdt_data;
+ int rc;
ENTRY;
- INIT_LIST_HEAD(&med->med_open_head);
+ CFS_INIT_LIST_HEAD(&med->med_open_head);
spin_lock_init(&med->med_open_lock);
+ sema_init(&med->med_idmap_sem, 1);
+ med->med_idmap = NULL;
spin_lock(&exp->exp_lock);
exp->exp_connecting = 1;
spin_unlock(&exp->exp_lock);
- RETURN(0);
+ rc = ldlm_init_export(exp);
+ if (rc)
+ CERROR("Error %d while initializing export\n", rc);
+ RETURN(rc);
}
static int mdt_destroy_export(struct obd_export *export)
mdt_cleanup_idmap(med);
target_destroy_export(export);
+ ldlm_destroy_export(export);
if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
RETURN(0);
switch (ev) {
case OBD_NOTIFY_CONFIG:
- mdt_allow_cli(mdt_dev(host->obd_lu_dev), (unsigned int)data);
+ mdt_allow_cli(mdt_dev(host->obd_lu_dev), (unsigned long)data);
break;
default:
CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
switch (cmd) {
case OBD_IOC_SYNC:
- rc = dt->dd_ops->dt_sync(&env, dt);
+ rc = mdt_device_sync(&env, mdt);
break;
case OBD_IOC_SET_READONLY:
rc = dt->dd_ops->dt_sync(&env, dt);
struct obd_device *obd = mdt->mdt_md_dev.md_lu_dev.ld_obd;
int rc, lost;
ENTRY;
- /* if some clients didn't participate in recovery then we can possibly
+ /* if some clients didn't participate in recovery then we can possibly
* lost sequence. Now we should increase sequence for safe value */
lost = obd->obd_max_recoverable_clients - obd->obd_connected_clients;
mdt_seq_adjust(env, mdt, lost);
-
+
rc = ld->ld_ops->ldo_recovery_complete(env, ld);
RETURN(rc);
}
RETURN(NULL);
}
-static void mdt_device_free(const struct lu_env *env, struct lu_device *d)
+static struct lu_device *mdt_device_free(const struct lu_env *env,
+ struct lu_device *d)
{
struct mdt_device *m = mdt_dev(d);
+ ENTRY;
OBD_FREE_PTR(m);
+ RETURN(NULL);
}
static struct lu_device *mdt_device_alloc(const struct lu_env *env,
int rc;
mdt_num_threads = MDT_NUM_THREADS;
- lprocfs_init_vars(mdt, &lvars);
+ lprocfs_mdt_init_vars(&lvars);
rc = class_register_type(&mdt_obd_device_ops, NULL,
lvars.module_vars, LUSTRE_MDT_NAME,
&mdt_device_type);
DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus),
DEF_MDT_HNDL_F(HABEO_CORPUS, GETATTR, mdt_getattr),
DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
-DEF_MDT_HNDL_F(HABEO_CORPUS|MUTABOR, SETXATTR, mdt_setxattr),
DEF_MDT_HNDL_F(HABEO_CORPUS, GETXATTR, mdt_getxattr),
DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs),
DEF_MDT_HNDL_F(0 |MUTABOR, REINT, mdt_reint),
DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin),
DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR, mdt_is_subdir),
-DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle),
-DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle)
+DEF_MDT_HNDL_F(0, QUOTACHECK, mdt_quotacheck_handle),
+DEF_MDT_HNDL_F(0, QUOTACTL, mdt_quotactl_handle)
};
#define DEF_OBD_HNDL(flags, name, fn) \
.mos_hs = mdt_obd_ops
},
{
+ .mos_opc_start = SEC_CTX_INIT,
+ .mos_opc_end = SEC_LAST_OPC,
+ .mos_hs = mdt_sec_ctx_ops
+ },
+ {
.mos_hs = NULL
}
};
}
};
-MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
+MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
MODULE_LICENSE("GPL");