1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Server (mds) request handler
7 * Copyright (c) 2001-2005 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
13 * This file is part of the Lustre file system, http://www.lustre.org
14 * Lustre is a trademark of Cluster File Systems, Inc.
16 * You may have signed or agreed to another license before downloading
17 * this software. If so, you are bound by the terms and conditions
18 * of that agreement, and the following does not apply to you. See the
19 * LICENSE file included with this distribution for more information.
21 * If you did not agree to a different license, then this copy of Lustre
22 * is open source software; you can redistribute it and/or modify it
23 * under the terms of version 2 of the GNU General Public License as
24 * published by the Free Software Foundation.
26 * In either case, Lustre is distributed in the hope that it will be
27 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 * license text for more details.
33 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_MDS
37 #include <lustre_mds.h>
38 #include <linux/module.h>
39 #include <linux/init.h>
40 #include <linux/random.h>
42 #include <linux/jbd.h>
43 #include <linux/ext3_fs.h>
44 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
45 # include <linux/smp_lock.h>
46 # include <linux/buffer_head.h>
47 # include <linux/workqueue.h>
48 # include <linux/mount.h>
50 # include <linux/locks.h>
53 #include <linux/lustre_acl.h>
54 #include <obd_class.h>
55 #include <lustre_dlm.h>
57 #include <lustre_fsfilt.h>
58 #include <lprocfs_status.h>
59 #include <lustre_commit_confd.h>
60 #include <lustre_quota.h>
61 #include <lustre_disk.h>
62 #include <lustre_ver.h>
64 #include "mds_internal.h"
67 CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
68 "number of MDS service threads to start");
70 static int mds_intent_policy(struct ldlm_namespace *ns,
71 struct ldlm_lock **lockp, void *req_cookie,
72 ldlm_mode_t mode, int flags, void *data);
73 static int mds_postsetup(struct obd_device *obd);
74 static int mds_cleanup(struct obd_device *obd);
76 /* Assumes caller has already pushed into the kernel filesystem context */
77 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
78 loff_t offset, int count)
80 struct ptlrpc_bulk_desc *desc;
81 struct l_wait_info lwi;
83 int rc = 0, npages, i, tmpcount, tmpsize = 0;
86 LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
88 npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
89 OBD_ALLOC(pages, sizeof(*pages) * npages);
91 GOTO(out, rc = -ENOMEM);
93 desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
96 GOTO(out_free, rc = -ENOMEM);
98 for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
99 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
101 pages[i] = alloc_pages(GFP_KERNEL, 0);
102 if (pages[i] == NULL)
103 GOTO(cleanup_buf, rc = -ENOMEM);
105 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
108 for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
109 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
110 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
111 tmpsize, offset, file->f_dentry->d_inode->i_ino,
112 file->f_dentry->d_inode->i_size);
114 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
115 kmap(pages[i]), tmpsize, &offset);
119 GOTO(cleanup_buf, rc = -EIO);
122 LASSERT(desc->bd_nob == count);
124 rc = ptlrpc_start_bulk_transfer(desc);
126 GOTO(cleanup_buf, rc);
128 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
129 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
130 OBD_FAIL_MDS_SENDPAGE, rc);
131 GOTO(abort_bulk, rc);
134 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
135 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
136 LASSERT (rc == 0 || rc == -ETIMEDOUT);
139 if (desc->bd_success &&
140 desc->bd_nob_transferred == count)
141 GOTO(cleanup_buf, rc);
143 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
146 DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
147 (rc == -ETIMEDOUT) ? "timeout" : "network error",
148 desc->bd_nob_transferred, count,
149 req->rq_export->exp_client_uuid.uuid,
150 req->rq_export->exp_connection->c_remote_uuid.uuid);
152 class_fail_export(req->rq_export);
156 ptlrpc_abort_bulk (desc);
158 for (i = 0; i < npages; i++)
160 __free_pages(pages[i], 0);
162 ptlrpc_free_bulk(desc);
164 OBD_FREE(pages, sizeof(*pages) * npages);
169 /* only valid locked dentries or errors should be returned */
170 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
171 struct vfsmount **mnt, int lock_mode,
172 struct lustre_handle *lockh,
175 struct mds_obd *mds = &obd->u.mds;
176 struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
177 struct ldlm_res_id res_id = { .name = {0} };
179 ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
185 res_id.name[0] = de->d_inode->i_ino;
186 res_id.name[1] = de->d_inode->i_generation;
187 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
188 LDLM_IBITS, &policy, lock_mode, &flags,
189 ldlm_blocking_ast, ldlm_completion_ast,
190 NULL, NULL, NULL, 0, NULL, lockh);
191 if (rc != ELDLM_OK) {
193 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
199 /* Look up an entry by inode number. */
200 /* this function ONLY returns valid dget'd dentries with an initialized inode
202 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
203 struct vfsmount **mnt)
206 unsigned long ino = fid->id;
207 __u32 generation = fid->generation;
209 struct dentry *result;
212 RETURN(ERR_PTR(-ESTALE));
214 snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
216 CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
217 ino, generation, mds->mds_obt.obt_sb);
219 /* under ext3 this is neither supposed to return bad inodes
221 result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
225 inode = result->d_inode;
227 RETURN(ERR_PTR(-ENOENT));
229 if (inode->i_generation == 0 || inode->i_nlink == 0) {
230 LCONSOLE_WARN("Found inode with zero generation or link -- this"
231 " may indicate disk corruption (inode: %lu/%u, "
232 "link %lu, count %d)\n", inode->i_ino,
233 inode->i_generation,(unsigned long)inode->i_nlink,
234 atomic_read(&inode->i_count));
236 RETURN(ERR_PTR(-ENOENT));
239 if (generation && inode->i_generation != generation) {
240 /* we didn't find the right inode.. */
241 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
242 "count: %d, generation %u/%u\n", inode->i_ino,
243 (unsigned long)inode->i_nlink,
244 atomic_read(&inode->i_count), inode->i_generation,
247 RETURN(ERR_PTR(-ENOENT));
251 *mnt = mds->mds_vfsmnt;
258 static int mds_connect_internal(struct obd_export *exp,
259 struct obd_connect_data *data)
261 struct obd_device *obd = exp->exp_obd;
263 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
264 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
266 /* If no known bits (which should not happen, probably,
267 as everybody should support LOOKUP and UPDATE bits at least)
268 revert to compat mode with plain locks. */
269 if (!data->ocd_ibits_known &&
270 data->ocd_connect_flags & OBD_CONNECT_IBITS)
271 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
273 if (!obd->u.mds.mds_fl_acl)
274 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
276 if (!obd->u.mds.mds_fl_user_xattr)
277 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
279 exp->exp_connect_flags = data->ocd_connect_flags;
280 data->ocd_version = LUSTRE_VERSION_CODE;
281 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
284 if (obd->u.mds.mds_fl_acl &&
285 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
286 CWARN("%s: MDS requires ACL support but client does not\n",
293 static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
294 struct obd_uuid *cluuid,
295 struct obd_connect_data *data)
300 if (exp == NULL || obd == NULL || cluuid == NULL)
303 rc = mds_connect_internal(exp, data);
308 /* Establish a connection to the MDS.
310 * This will set up an export structure for the client to hold state data
311 * about that client, like open files, the last operation number it did
312 * on the server, etc.
314 static int mds_connect(const struct lu_context *ctx,
315 struct lustre_handle *conn, struct obd_device *obd,
316 struct obd_uuid *cluuid, struct obd_connect_data *data)
318 struct obd_export *exp;
319 struct mds_export_data *med;
320 struct mds_client_data *mcd = NULL;
321 int rc, abort_recovery;
324 if (!conn || !obd || !cluuid)
327 /* Check for aborted recovery. */
328 spin_lock_bh(&obd->obd_processing_task_lock);
329 abort_recovery = obd->obd_abort_recovery;
330 spin_unlock_bh(&obd->obd_processing_task_lock);
332 target_abort_recovery(obd);
334 /* XXX There is a small race between checking the list and adding a
335 * new connection for the same UUID, but the real threat (list
336 * corruption when multiple different clients connect) is solved.
338 * There is a second race between adding the export to the list,
339 * and filling in the client data below. Hence skipping the case
340 * of NULL mcd above. We should already be controlling multiple
341 * connects at the client, and we can't hold the spinlock over
342 * memory allocations without risk of deadlocking.
344 rc = class_connect(conn, obd, cluuid);
347 exp = class_conn2export(conn);
349 med = &exp->exp_mds_data;
351 rc = mds_connect_internal(exp, data);
355 OBD_ALLOC(mcd, sizeof(*mcd));
357 GOTO(out, rc = -ENOMEM);
359 memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
362 rc = mds_client_add(obd, &obd->u.mds, med, -1);
368 OBD_FREE(mcd, sizeof(*mcd));
371 class_disconnect(exp);
373 class_export_put(exp);
379 int mds_init_export(struct obd_export *exp)
381 struct mds_export_data *med = &exp->exp_mds_data;
383 INIT_LIST_HEAD(&med->med_open_head);
384 spin_lock_init(&med->med_open_lock);
385 exp->exp_connecting = 1;
389 static int mds_destroy_export(struct obd_export *export)
391 struct mds_export_data *med;
392 struct obd_device *obd = export->exp_obd;
393 struct lvfs_run_ctxt saved;
397 med = &export->exp_mds_data;
398 target_destroy_export(export);
400 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
403 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
404 /* Close any open files (which may also cause orphan unlinking). */
405 spin_lock(&med->med_open_lock);
406 while (!list_empty(&med->med_open_head)) {
407 struct list_head *tmp = med->med_open_head.next;
408 struct mds_file_data *mfd =
409 list_entry(tmp, struct mds_file_data, mfd_list);
410 struct dentry *dentry = mfd->mfd_dentry;
412 /* Remove mfd handle so it can't be found again.
413 * We are consuming the mfd_list reference here. */
414 mds_mfd_unlink(mfd, 0);
415 spin_unlock(&med->med_open_lock);
417 /* If you change this message, be sure to update
418 * replay_single:test_46 */
419 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
420 "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len,
421 dentry->d_name.name, dentry->d_inode->i_ino);
422 /* child orphan sem protects orphan_dec_test and
423 * is_orphan race, mds_mfd_close drops it */
424 MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
425 rc = mds_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd,
426 !(export->exp_flags & OBD_OPT_FAILOVER));
429 CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
430 spin_lock(&med->med_open_lock);
432 spin_unlock(&med->med_open_lock);
433 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
434 mds_client_free(export);
439 static int mds_disconnect(struct obd_export *exp)
441 unsigned long irqflags;
446 class_export_get(exp);
448 /* Disconnect early so that clients can't keep using export */
449 rc = class_disconnect(exp);
450 ldlm_cancel_locks_for_export(exp);
452 /* complete all outstanding replies */
453 spin_lock_irqsave(&exp->exp_lock, irqflags);
454 while (!list_empty(&exp->exp_outstanding_replies)) {
455 struct ptlrpc_reply_state *rs =
456 list_entry(exp->exp_outstanding_replies.next,
457 struct ptlrpc_reply_state, rs_exp_list);
458 struct ptlrpc_service *svc = rs->rs_service;
460 spin_lock(&svc->srv_lock);
461 list_del_init(&rs->rs_exp_list);
462 ptlrpc_schedule_difficult_reply(rs);
463 spin_unlock(&svc->srv_lock);
465 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
467 class_export_put(exp);
471 static int mds_getstatus(struct ptlrpc_request *req)
473 struct mds_obd *mds = mds_req2mds(req);
474 struct mds_body *body;
475 int rc, size = sizeof(*body);
478 rc = lustre_pack_reply(req, 1, &size, NULL);
479 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
480 CERROR("mds: out of memory for message: size=%d\n", size);
481 req->rq_status = -ENOMEM; /* superfluous? */
485 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
486 memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
488 /* the last_committed and last_xid fields are filled in for all
489 * replies already - no need to do so here also.
494 /* get the LOV EA from @inode and store it into @md. It can be at most
495 * @size bytes, and @size is updated with the actual EA size.
496 * The EA size is also returned on success, and -ve errno on failure.
497 * If there is no EA then 0 is returned. */
498 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
505 LOCK_INODE_MUTEX(inode);
506 rc = fsfilt_get_md(obd, inode, md, *size, "lov");
509 CERROR("Error %d reading eadata for ino %lu\n",
513 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
525 UNLOCK_INODE_MUTEX(inode);
531 /* Call with lock=1 if you want mds_pack_md to take the i_mutex.
532 * Call with lock=0 if the caller has already taken the i_mutex. */
533 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
534 struct mds_body *body, struct inode *inode, int lock)
536 struct mds_obd *mds = &obd->u.mds;
542 lmm = lustre_msg_buf(msg, offset, 0);
544 /* Some problem with getting eadata when I sized the reply
546 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
550 lmm_size = msg->buflens[offset];
552 /* I don't really like this, but it is a sanity check on the client
553 * MD request. However, if the client doesn't know how much space
554 * to reserve for the MD, it shouldn't be bad to have too much space.
556 if (lmm_size > mds->mds_max_mdsize) {
557 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
558 inode->i_ino, lmm_size, mds->mds_max_mdsize);
562 rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
564 if (S_ISDIR(inode->i_mode))
565 body->valid |= OBD_MD_FLDIREA;
567 body->valid |= OBD_MD_FLEASIZE;
568 body->eadatasize = lmm_size;
575 #ifdef CONFIG_FS_POSIX_ACL
577 int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
578 struct mds_body *repbody, int repoff)
580 struct dentry de = { .d_inode = inode };
584 LASSERT(repbody->aclsize == 0);
585 LASSERT(repmsg->bufcount > repoff);
587 buflen = lustre_msg_buflen(repmsg, repoff);
591 if (!inode->i_op || !inode->i_op->getxattr)
595 rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS,
596 lustre_msg_buf(repmsg, repoff, buflen),
601 repbody->aclsize = rc;
602 else if (rc != -ENODATA) {
603 CERROR("buflen %d, get acl: %d\n", buflen, rc);
608 repbody->valid |= OBD_MD_FLACL;
612 #define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0
615 int mds_pack_acl(struct mds_export_data *med, struct inode *inode,
616 struct lustre_msg *repmsg, struct mds_body *repbody,
619 return mds_pack_posix_acl(inode, repmsg, repbody, repoff);
622 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
623 struct ptlrpc_request *req,
624 struct mds_body *reqbody, int reply_off)
626 struct mds_body *body;
627 struct inode *inode = dentry->d_inode;
634 body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
635 LASSERT(body != NULL); /* caller prepped reply */
637 mds_pack_inode2fid(&body->fid1, inode);
638 mds_pack_inode2body(body, inode);
641 if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
642 (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
643 rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body,
646 /* If we have LOV EA data, the OST holds size, atime, mtime */
647 if (!(body->valid & OBD_MD_FLEASIZE) &&
648 !(body->valid & OBD_MD_FLDIREA))
649 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
650 OBD_MD_FLATIME | OBD_MD_FLMTIME);
652 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
653 if (body->eadatasize)
655 } else if (S_ISLNK(inode->i_mode) &&
656 (reqbody->valid & OBD_MD_LINKNAME) != 0) {
657 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
660 LASSERT (symname != NULL); /* caller prepped reply */
661 len = req->rq_repmsg->buflens[reply_off];
663 rc = inode->i_op->readlink(dentry, symname, len);
665 CERROR("readlink failed: %d\n", rc);
666 } else if (rc != len - 1) {
667 CERROR ("Unexpected readlink rc %d: expecting %d\n",
671 CDEBUG(D_INODE, "read symlink dest %s\n", symname);
672 body->valid |= OBD_MD_LINKNAME;
673 body->eadatasize = rc + 1;
674 symname[rc] = 0; /* NULL terminate */
680 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
681 struct mds_obd *mds = mds_req2mds(req);
682 body->max_cookiesize = mds->mds_max_cookiesize;
683 body->max_mdsize = mds->mds_max_mdsize;
684 body->valid |= OBD_MD_FLMODEASIZE;
690 #ifdef CONFIG_FS_POSIX_ACL
691 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
692 (reqbody->valid & OBD_MD_FLACL)) {
693 rc = mds_pack_acl(&req->rq_export->exp_mds_data,
694 inode, req->rq_repmsg,
697 lustre_shrink_reply(req, reply_off, body->aclsize, 0);
706 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
709 struct mds_obd *mds = mds_req2mds(req);
710 struct mds_body *body;
711 int rc, size[3] = {sizeof(*body)}, bufcount = 1;
714 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
715 LASSERT(body != NULL); /* checked by caller */
716 LASSERT_REQSWABBED(req, offset); /* swabbed by caller */
718 if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
719 (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
720 LOCK_INODE_MUTEX(inode);
721 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
723 UNLOCK_INODE_MUTEX(inode);
724 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
727 if (rc != -ENODATA) {
728 CERROR("error getting inode %lu MD: rc = %d\n",
733 } else if (rc > mds->mds_max_mdsize) {
735 CERROR("MD size %d larger than maximum possible %u\n",
736 rc, mds->mds_max_mdsize);
741 } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
742 if (inode->i_size + 1 != body->eadatasize)
743 CERROR("symlink size: %Lu, reply space: %d\n",
744 inode->i_size + 1, body->eadatasize);
745 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
747 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
748 inode->i_size + 1, body->eadatasize);
751 #ifdef CONFIG_FS_POSIX_ACL
752 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
753 (body->valid & OBD_MD_FLACL)) {
754 struct dentry de = { .d_inode = inode };
757 if (inode->i_op && inode->i_op->getxattr) {
759 rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS,
764 if (rc != -ENODATA) {
765 CERROR("got acl size: %d\n", rc);
775 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
776 CERROR("failed MDS_GETATTR_PACK test\n");
777 req->rq_status = -ENOMEM;
781 rc = lustre_pack_reply(req, bufcount, size, NULL);
783 CERROR("lustre_pack_reply failed: rc %d\n", rc);
791 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
792 int child_part, struct lustre_handle *child_lockh)
794 struct obd_device *obd = req->rq_export->exp_obd;
795 struct mds_obd *mds = &obd->u.mds;
796 struct ldlm_reply *rep = NULL;
797 struct lvfs_run_ctxt saved;
798 struct mds_body *body;
799 struct dentry *dparent = NULL, *dchild = NULL;
800 struct lvfs_ucred uc = {NULL,};
801 struct lustre_handle parent_lockh;
803 int rc = 0, cleanup_phase = 0, resent_req = 0;
807 LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
809 /* Swab now, before anyone looks inside the request */
811 body = lustre_swab_reqbuf(req, offset, sizeof(*body),
812 lustre_swab_mds_body);
814 CERROR("Can't swab mds_body\n");
818 LASSERT_REQSWAB(req, offset + 1);
819 name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
821 CERROR("Can't unpack name\n");
824 namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
826 rc = mds_init_ucred(&uc, req, offset);
830 LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF);
831 /* if requests were at offset 2, the getattr reply goes back at 1 */
832 if (offset == MDS_REQ_INTENT_REC_OFF) {
833 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
837 push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
838 cleanup_phase = 1; /* kernel context */
839 intent_set_disposition(rep, DISP_LOOKUP_EXECD);
841 /* FIXME: handle raw lookup */
843 if (body->valid == OBD_MD_FLID) {
844 struct mds_body *mds_reply;
845 int size = sizeof(*mds_reply);
847 // The user requested ONLY the inode number, so do a raw lookup
848 rc = lustre_pack_reply(req, 1, &size, NULL);
850 CERROR("out of memory\n");
854 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
856 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
858 mds_reply->fid1.id = inum;
859 mds_reply->valid = OBD_MD_FLID;
864 if (lustre_handle_is_used(child_lockh)) {
865 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
869 if (resent_req == 0) {
871 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
872 &parent_lockh, &dparent,
874 MDS_INODELOCK_UPDATE,
876 child_lockh, &dchild, LCK_CR,
879 /* For revalidate by fid we always take UPDATE lock */
880 dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
882 MDS_INODELOCK_UPDATE);
885 rc = PTR_ERR(dchild);
890 struct ldlm_lock *granted_lock;
891 struct ll_fid child_fid;
892 struct ldlm_resource *res;
893 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
894 granted_lock = ldlm_handle2lock(child_lockh);
895 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
896 body->fid1.id, body->fid1.generation,
897 child_lockh->cookie);
900 res = granted_lock->l_resource;
901 child_fid.id = res->lr_name.name[0];
902 child_fid.generation = res->lr_name.name[1];
903 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
904 LASSERT(!IS_ERR(dchild));
905 LDLM_LOCK_PUT(granted_lock);
908 cleanup_phase = 2; /* dchild, dparent, locks */
910 if (dchild->d_inode == NULL) {
911 intent_set_disposition(rep, DISP_LOOKUP_NEG);
912 /* in the intent case, the policy clears this error:
913 the disposition is enough */
914 GOTO(cleanup, rc = -ENOENT);
916 intent_set_disposition(rep, DISP_LOOKUP_POS);
919 if (req->rq_repmsg == NULL) {
920 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
922 CERROR ("mds_getattr_pack_msg: %d\n", rc);
927 rc = mds_getattr_internal(obd, dchild, req, body, offset);
928 GOTO(cleanup, rc); /* returns the lock to the client */
931 switch (cleanup_phase) {
933 if (resent_req == 0) {
934 if (rc && dchild->d_inode)
935 ldlm_lock_decref(child_lockh, LCK_CR);
936 ldlm_lock_decref(&parent_lockh, LCK_CR);
941 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
943 mds_exit_ucred(&uc, mds);
944 if (req->rq_reply_state == NULL) {
946 lustre_pack_reply(req, 0, NULL, NULL);
952 static int mds_getattr(struct ptlrpc_request *req, int offset)
954 struct mds_obd *mds = mds_req2mds(req);
955 struct obd_device *obd = req->rq_export->exp_obd;
956 struct lvfs_run_ctxt saved;
958 struct mds_body *body;
959 struct lvfs_ucred uc = {NULL,};
963 body = lustre_swab_reqbuf(req, offset, sizeof(*body),
964 lustre_swab_mds_body);
968 rc = mds_init_ucred(&uc, req, offset);
972 push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
973 de = mds_fid2dentry(mds, &body->fid1, NULL);
975 rc = req->rq_status = PTR_ERR(de);
979 rc = mds_getattr_pack_msg(req, de->d_inode, offset);
981 CERROR("mds_getattr_pack_msg: %d\n", rc);
985 req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
990 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
992 if (req->rq_reply_state == NULL) {
994 lustre_pack_reply(req, 0, NULL, NULL);
996 mds_exit_ucred(&uc, mds);
1000 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1001 unsigned long max_age)
1005 spin_lock(&obd->obd_osfs_lock);
1006 rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
1008 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1009 spin_unlock(&obd->obd_osfs_lock);
1014 static int mds_statfs(struct ptlrpc_request *req)
1016 struct obd_device *obd = req->rq_export->exp_obd;
1017 int rc, size = sizeof(struct obd_statfs);
1020 /* This will trigger a watchdog timeout */
1021 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1022 (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
1024 rc = lustre_pack_reply(req, 1, &size, NULL);
1025 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1026 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1030 /* We call this so that we can cache a bit - 1 jiffie worth */
1031 rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
1034 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1040 req->rq_status = rc;
1044 static int mds_sync(struct ptlrpc_request *req, int offset)
1046 struct obd_device *obd = req->rq_export->exp_obd;
1047 struct mds_obd *mds = &obd->u.mds;
1048 struct mds_body *body;
1049 int rc, size = sizeof(*body);
1052 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
1054 GOTO(out, rc = -EFAULT);
1056 rc = lustre_pack_reply(req, 1, &size, NULL);
1057 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1058 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1062 if (body->fid1.id == 0) {
1063 /* a fid of zero is taken to mean "sync whole filesystem" */
1064 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
1069 de = mds_fid2dentry(mds, &body->fid1, NULL);
1071 GOTO(out, rc = PTR_ERR(de));
1073 /* The file parameter isn't used for anything */
1074 if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
1075 rc = de->d_inode->i_fop->fsync(NULL, de, 1);
1077 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
1078 mds_pack_inode2fid(&body->fid1, de->d_inode);
1079 mds_pack_inode2body(body, de->d_inode);
1086 req->rq_status = rc;
1090 /* mds_readpage does not take a DLM lock on the inode, because the client must
1091 * already have a PR lock.
1093 * If we were to take another one here, a deadlock will result, if another
1094 * thread is already waiting for a PW lock. */
1095 static int mds_readpage(struct ptlrpc_request *req, int offset)
1097 struct obd_device *obd = req->rq_export->exp_obd;
1098 struct mds_obd *mds = &obd->u.mds;
1099 struct vfsmount *mnt;
1102 struct mds_body *body, *repbody;
1103 struct lvfs_run_ctxt saved;
1104 int rc, size = sizeof(*repbody);
1105 struct lvfs_ucred uc = {NULL,};
1108 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1111 rc = lustre_pack_reply(req, 1, &size, NULL);
1113 CERROR("error packing readpage reply: rc %d\n", rc);
1117 body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1118 lustre_swab_mds_body);
1120 GOTO (out, rc = -EFAULT);
1122 rc = mds_init_ucred(&uc, req, 0);
1126 push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1127 de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1129 GOTO(out_pop, rc = PTR_ERR(de));
1131 CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1133 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1134 /* note: in case of an error, dentry_open puts dentry */
1136 GOTO(out_pop, rc = PTR_ERR(file));
1138 /* body->size is actually the offset -eeb */
1139 if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
1140 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1141 body->size, de->d_inode->i_blksize);
1142 GOTO(out_file, rc = -EFAULT);
1145 /* body->nlink is actually the #bytes to read -eeb */
1146 if (body->nlink & (de->d_inode->i_blksize - 1)) {
1147 CERROR("size %u is not multiple of blocksize %lu\n",
1148 body->nlink, de->d_inode->i_blksize);
1149 GOTO(out_file, rc = -EFAULT);
1152 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1153 repbody->size = file->f_dentry->d_inode->i_size;
1154 repbody->valid = OBD_MD_FLSIZE;
1156 /* to make this asynchronous make sure that the handling function
1157 doesn't send a reply when this function completes. Instead a
1158 callback function would send the reply */
1159 /* body->size is actually the offset -eeb */
1160 rc = mds_sendpage(req, file, body->size, body->nlink);
1163 filp_close(file, 0);
1165 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1167 mds_exit_ucred(&uc, mds);
1168 req->rq_status = rc;
1172 int mds_reint(struct ptlrpc_request *req, int offset,
1173 struct lustre_handle *lockh)
1175 struct mds_update_record *rec; /* 116 bytes on the stack? no sir! */
1178 OBD_ALLOC(rec, sizeof(*rec));
1182 rc = mds_update_unpack(req, offset, rec);
1183 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1184 CERROR("invalid record\n");
1185 GOTO(out, req->rq_status = -EINVAL);
1188 /* rc will be used to interrupt a for loop over multiple records */
1189 rc = mds_reint_rec(rec, offset, req, lockh);
1191 OBD_FREE(rec, sizeof(*rec));
1195 int mds_filter_recovery_request(struct ptlrpc_request *req,
1196 struct obd_device *obd, int *process)
1198 switch (req->rq_reqmsg->opc) {
1199 case MDS_CONNECT: /* This will never get here, but for completeness. */
1200 case OST_CONNECT: /* This will never get here, but for completeness. */
1201 case MDS_DISCONNECT:
1202 case OST_DISCONNECT:
1207 case MDS_SYNC: /* used in unmounting */
1211 *process = target_queue_recovery_request(req, obd);
1215 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1217 /* XXX what should we set rq_status to here? */
1218 req->rq_status = -EAGAIN;
1219 RETURN(ptlrpc_error(req));
1222 EXPORT_SYMBOL(mds_filter_recovery_request);
1224 static char *reint_names[] = {
1225 [REINT_SETATTR] "setattr",
1226 [REINT_CREATE] "create",
1227 [REINT_LINK] "link",
1228 [REINT_UNLINK] "unlink",
1229 [REINT_RENAME] "rename",
1230 [REINT_OPEN] "open",
1233 static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
1240 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1242 DEBUG_REQ(D_HA, req, "no set_info key");
1245 keylen = req->rq_reqmsg->buflens[0];
1247 val = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*val));
1249 DEBUG_REQ(D_HA, req, "no set_info val");
1253 rc = lustre_pack_reply(req, 0, NULL, NULL);
1256 req->rq_repmsg->status = 0;
1258 if (keylen < strlen("read-only") ||
1259 memcmp(key, "read-only", keylen) != 0)
1263 exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1265 exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1270 static int mds_handle_quotacheck(struct ptlrpc_request *req)
1272 struct obd_quotactl *oqctl;
1276 oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl),
1277 lustre_swab_obd_quotactl);
1281 rc = lustre_pack_reply(req, 0, NULL, NULL);
1283 CERROR("mds: out of memory while packing quotacheck reply\n");
1287 req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1291 static int mds_handle_quotactl(struct ptlrpc_request *req)
1293 struct obd_quotactl *oqctl, *repoqc;
1294 int rc, size = sizeof(*repoqc);
1297 oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl),
1298 lustre_swab_obd_quotactl);
1302 rc = lustre_pack_reply(req, 1, &size, NULL);
1306 repoqc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repoqc));
1308 req->rq_status = obd_quotactl(req->rq_export, oqctl);
1313 int mds_msg_check_version(struct lustre_msg *msg)
1317 /* TODO: enable the below check while really introducing msg version.
1318 * it's disabled because it will break compatibility with b1_4.
1324 case MDS_DISCONNECT:
1326 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1328 CERROR("bad opc %u version %08x, expecting %08x\n",
1329 msg->opc, msg->version, LUSTRE_OBD_VERSION);
1333 case MDS_GETATTR_NAME:
1338 case MDS_DONE_WRITING:
1344 case MDS_QUOTACHECK:
1348 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1350 CERROR("bad opc %u version %08x, expecting %08x\n",
1351 msg->opc, msg->version, LUSTRE_MDS_VERSION);
1355 case LDLM_BL_CALLBACK:
1356 case LDLM_CP_CALLBACK:
1357 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1359 CERROR("bad opc %u version %08x, expecting %08x\n",
1360 msg->opc, msg->version, LUSTRE_DLM_VERSION);
1362 case OBD_LOG_CANCEL:
1363 case LLOG_ORIGIN_HANDLE_CREATE:
1364 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1365 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1366 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1367 case LLOG_ORIGIN_HANDLE_CLOSE:
1369 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1371 CERROR("bad opc %u version %08x, expecting %08x\n",
1372 msg->opc, msg->version, LUSTRE_LOG_VERSION);
1375 CERROR("MDS unknown opcode %d\n", msg->opc);
1380 EXPORT_SYMBOL(mds_msg_check_version);
1382 int mds_handle(struct ptlrpc_request *req)
1384 int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1386 struct mds_obd *mds = NULL; /* quell gcc overwarning */
1387 struct obd_device *obd = NULL;
1390 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1392 LASSERT(current->journal_info == NULL);
1394 rc = mds_msg_check_version(req->rq_reqmsg);
1396 CERROR("MDS drop mal-formed request\n");
1400 /* XXX identical to OST */
1401 if (req->rq_reqmsg->opc != MDS_CONNECT) {
1402 struct mds_export_data *med;
1403 int recovering, abort_recovery;
1405 if (req->rq_export == NULL) {
1406 CERROR("operation %d on unconnected MDS from %s\n",
1407 req->rq_reqmsg->opc,
1408 libcfs_id2str(req->rq_peer));
1409 req->rq_status = -ENOTCONN;
1410 GOTO(out, rc = -ENOTCONN);
1413 med = &req->rq_export->exp_mds_data;
1414 obd = req->rq_export->exp_obd;
1415 mds = mds_req2mds(req);
1417 /* sanity check: if the xid matches, the request must
1418 * be marked as a resent or replayed */
1419 if (req->rq_xid == med->med_mcd->mcd_last_xid)
1420 LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
1421 (MSG_RESENT | MSG_REPLAY),
1422 "rq_xid "LPU64" matches last_xid, "
1423 "expected RESENT flag\n",
1425 /* else: note the opposite is not always true; a
1426 * RESENT req after a failover will usually not match
1427 * the last_xid, since it was likely never
1428 * committed. A REPLAYed request will almost never
1429 * match the last xid, however it could for a
1430 * committed, but still retained, open. */
1432 /* Check for aborted recovery. */
1433 spin_lock_bh(&obd->obd_processing_task_lock);
1434 abort_recovery = obd->obd_abort_recovery;
1435 recovering = obd->obd_recovering;
1436 spin_unlock_bh(&obd->obd_processing_task_lock);
1437 if (abort_recovery) {
1438 target_abort_recovery(obd);
1439 } else if (recovering) {
1440 rc = mds_filter_recovery_request(req, obd,
1442 if (rc || !should_process)
1447 switch (req->rq_reqmsg->opc) {
1449 DEBUG_REQ(D_INODE, req, "connect");
1450 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1451 rc = target_handle_connect(req, mds_handle);
1453 /* Now that we have an export, set mds. */
1455 * XXX nikita: these assignments are useless: mds is
1456 * never used below, and obd is only used for
1457 * MSG_LAST_REPLAY case, which never happens for
1460 obd = req->rq_export->exp_obd;
1461 mds = mds_req2mds(req);
1465 case MDS_DISCONNECT:
1466 DEBUG_REQ(D_INODE, req, "disconnect");
1467 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1468 rc = target_handle_disconnect(req);
1469 req->rq_status = rc; /* superfluous? */
1473 DEBUG_REQ(D_INODE, req, "getstatus");
1474 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1475 rc = mds_getstatus(req);
1479 DEBUG_REQ(D_INODE, req, "getattr");
1480 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1481 rc = mds_getattr(req, MDS_REQ_REC_OFF);
1485 DEBUG_REQ(D_INODE, req, "setxattr");
1486 OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0);
1487 rc = mds_setxattr(req);
1491 DEBUG_REQ(D_INODE, req, "getxattr");
1492 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0);
1493 rc = mds_getxattr(req);
1496 case MDS_GETATTR_NAME: {
1497 struct lustre_handle lockh = { 0 };
1498 DEBUG_REQ(D_INODE, req, "getattr_name");
1499 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1501 /* If this request gets a reconstructed reply, we won't be
1502 * acquiring any new locks in mds_getattr_name, so we don't
1505 rc = mds_getattr_name(MDS_REQ_REC_OFF, req,
1506 MDS_INODELOCK_UPDATE, &lockh);
1507 /* this non-intent call (from an ioctl) is special */
1508 req->rq_status = rc;
1509 if (rc == 0 && lustre_handle_is_used(&lockh))
1510 ldlm_lock_decref(&lockh, LCK_CR);
1514 DEBUG_REQ(D_INODE, req, "statfs");
1515 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1516 rc = mds_statfs(req);
1520 DEBUG_REQ(D_INODE, req, "readpage");
1521 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1522 rc = mds_readpage(req, MDS_REQ_REC_OFF);
1524 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
1531 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
1534 int size[] = { sizeof(struct mds_body), mds->mds_max_mdsize,
1535 mds->mds_max_cookiesize};
1538 /* NB only peek inside req now; mds_reint() will swab it */
1540 CERROR ("Can't inspect opcode\n");
1545 if (lustre_msg_swabbed (req->rq_reqmsg))
1548 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1549 (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1550 reint_names[opc] == NULL) ? reint_names[opc] :
1553 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1555 if (opc == REINT_UNLINK || opc == REINT_RENAME)
1557 else if (opc == REINT_OPEN)
1562 rc = lustre_pack_reply(req, bufcount, size, NULL);
1566 rc = mds_reint(req, MDS_REQ_REC_OFF, NULL);
1567 fail = OBD_FAIL_MDS_REINT_NET_REP;
1572 DEBUG_REQ(D_INODE, req, "close");
1573 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1574 rc = mds_close(req, MDS_REQ_REC_OFF);
1577 case MDS_DONE_WRITING:
1578 DEBUG_REQ(D_INODE, req, "done_writing");
1579 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
1580 rc = mds_done_writing(req, MDS_REQ_REC_OFF);
1584 DEBUG_REQ(D_INODE, req, "pin");
1585 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1586 rc = mds_pin(req, MDS_REQ_REC_OFF);
1590 DEBUG_REQ(D_INODE, req, "sync");
1591 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
1592 rc = mds_sync(req, MDS_REQ_REC_OFF);
1596 DEBUG_REQ(D_INODE, req, "set_info");
1597 rc = mds_set_info_rpc(req->rq_export, req);
1600 case MDS_QUOTACHECK:
1601 DEBUG_REQ(D_INODE, req, "quotacheck");
1602 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
1603 rc = mds_handle_quotacheck(req);
1607 DEBUG_REQ(D_INODE, req, "quotactl");
1608 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
1609 rc = mds_handle_quotactl(req);
1613 DEBUG_REQ(D_INODE, req, "ping");
1614 rc = target_handle_ping(req);
1617 case OBD_LOG_CANCEL:
1618 CDEBUG(D_INODE, "log cancel\n");
1619 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1620 rc = -ENOTSUPP; /* la la la */
1624 DEBUG_REQ(D_INODE, req, "enqueue");
1625 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1626 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1627 ldlm_server_blocking_ast, NULL);
1628 fail = OBD_FAIL_LDLM_REPLY;
1631 DEBUG_REQ(D_INODE, req, "convert");
1632 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1633 rc = ldlm_handle_convert(req);
1635 case LDLM_BL_CALLBACK:
1636 case LDLM_CP_CALLBACK:
1637 DEBUG_REQ(D_INODE, req, "callback");
1638 CERROR("callbacks should not happen on MDS\n");
1640 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1642 case LLOG_ORIGIN_HANDLE_CREATE:
1643 DEBUG_REQ(D_INODE, req, "llog_init");
1644 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1645 rc = llog_origin_handle_create(req);
1647 case LLOG_ORIGIN_HANDLE_DESTROY:
1648 DEBUG_REQ(D_INODE, req, "llog_init");
1649 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1650 rc = llog_origin_handle_destroy(req);
1652 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1653 DEBUG_REQ(D_INODE, req, "llog next block");
1654 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1655 rc = llog_origin_handle_next_block(req);
1657 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1658 DEBUG_REQ(D_INODE, req, "llog prev block");
1659 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1660 rc = llog_origin_handle_prev_block(req);
1662 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1663 DEBUG_REQ(D_INODE, req, "llog read header");
1664 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1665 rc = llog_origin_handle_read_header(req);
1667 case LLOG_ORIGIN_HANDLE_CLOSE:
1668 DEBUG_REQ(D_INODE, req, "llog close");
1669 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1670 rc = llog_origin_handle_close(req);
1673 DEBUG_REQ(D_INODE, req, "llog catinfo");
1674 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1675 rc = llog_catinfo(req);
1678 req->rq_status = -ENOTSUPP;
1679 rc = ptlrpc_error(req);
1683 LASSERT(current->journal_info == NULL);
1685 /* If we're DISCONNECTing, the mds_export_data is already freed */
1686 if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1687 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1688 req->rq_repmsg->last_xid =
1689 le64_to_cpu(med->med_mcd->mcd_last_xid);
1691 target_committed_to_req(req);
1697 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1698 if (obd && obd->obd_recovering) {
1699 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1700 return target_queue_final_reply(req, rc);
1702 /* Lost a race with recovery; let the error path DTRT. */
1703 rc = req->rq_status = -ENOTCONN;
1706 target_send_reply(req, rc, fail);
1710 /* Update the server data on disk. This stores the new mount_count and
1711 * also the last_rcvd value to disk. If we don't have a clean shutdown,
1712 * then the server last_rcvd value may be less than that of the clients.
1713 * This will alert us that we may need to do client recovery.
1715 * Also assumes for mds_last_transno that we are not modifying it (no locking).
1717 int mds_update_server_data(struct obd_device *obd, int force_sync)
1719 struct mds_obd *mds = &obd->u.mds;
1720 struct lr_server_data *lsd = mds->mds_server_data;
1721 struct lr_server_data *lsd_copy = NULL;
1722 struct file *filp = mds->mds_rcvd_filp;
1723 struct lvfs_run_ctxt saved;
1728 CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1729 mds->mds_mount_count, mds->mds_last_transno);
1731 lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno);
1733 if (!(lsd->lsd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
1734 /* Swap to the old mds_server_data format, in case
1735 someone wants to revert to a pre-1.6 lustre */
1736 CDEBUG(D_CONFIG, "writing old last_rcvd format\n");
1737 /* malloc new struct instead of swap in-place because
1738 we don't have a lock on the last_trasno or mount count -
1739 someone may modify it while we're here, and we don't want
1740 them to inc the wrong thing. */
1741 OBD_ALLOC(lsd_copy, sizeof(*lsd_copy));
1745 lsd_copy->lsd_unused = lsd->lsd_last_transno;
1746 lsd_copy->lsd_last_transno = lsd->lsd_mount_count;
1750 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1751 rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync);
1752 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1754 CERROR("error writing MDS server data: rc = %d\n", rc);
1757 OBD_FREE(lsd_copy, sizeof(*lsd_copy));
1763 void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
1770 while (*p && *p != ',')
1774 if (len == sizeof("user_xattr") - 1 &&
1775 memcmp(options, "user_xattr", len) == 0) {
1776 mds->mds_fl_user_xattr = 1;
1777 } else if (len == sizeof("acl") - 1 &&
1778 memcmp(options, "acl", len) == 0) {
1779 #ifdef CONFIG_FS_POSIX_ACL
1780 mds->mds_fl_acl = 1;
1782 CWARN("ignoring unsupported acl mount option\n");
1783 memmove(options, p, strlen(p) + 1);
1790 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
1795 rc = llog_start_commit_thread();
1799 if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
1802 generate_random_uuid(uuid);
1803 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
1805 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
1806 if (mds->mds_profile == NULL)
1809 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
1810 LUSTRE_CFG_BUFLEN(lcfg, 3));
1815 /* mount the file system (secretly). lustre_cfg parameters are:
1821 static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
1823 struct lprocfs_static_vars lvars;
1824 struct mds_obd *mds = &obd->u.mds;
1825 struct lustre_mount_info *lmi;
1826 struct vfsmount *mnt;
1827 struct obd_uuid uuid;
1829 char *options, *str, *label;
1835 /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */
1837 CLASSERT(offsetof(struct obd_device, u.obt) ==
1838 offsetof(struct obd_device, u.mds.mds_obt));
1840 if (lcfg->lcfg_bufcount < 3)
1841 RETURN(rc = -EINVAL);
1843 if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
1844 RETURN(rc = -EINVAL);
1846 lmi = server_get_mount(obd->obd_name);
1848 /* We already mounted in lustre_fill_super.
1849 lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
1850 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
1852 obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
1854 /* old path - used by lctl */
1855 CERROR("Using old MDS mount method\n");
1856 page = __get_free_page(GFP_KERNEL);
1860 options = (char *)page;
1861 memset(options, 0, PAGE_SIZE);
1863 /* here we use "iopen_nopriv" hardcoded, because it affects
1864 * MDS utility and the rest of options are passed by mount
1865 * options. Probably this should be moved to somewhere else
1866 * like startup scripts or lconf. */
1867 strcpy(options, "iopen_nopriv");
1869 if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0 && lustre_cfg_buf(lcfg, 4)) {
1870 sprintf(options + strlen(options), ",%s",
1871 lustre_cfg_string(lcfg, 4));
1872 fsoptions_to_mds_flags(mds, options);
1875 mnt = do_kern_mount(lustre_cfg_string(lcfg, 2), 0,
1876 lustre_cfg_string(lcfg, 1),
1881 LCONSOLE_ERROR("Can't mount disk %s (%d)\n",
1882 lustre_cfg_string(lcfg, 1), rc);
1886 obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
1888 if (IS_ERR(obd->obd_fsops))
1889 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
1891 CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
1893 LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1895 //sema_init(&mds->mds_orphan_recovery_sem, 1);
1896 sema_init(&mds->mds_epoch_sem, 1);
1897 spin_lock_init(&mds->mds_transno_lock);
1898 mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1899 mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1900 mds->mds_atime_diff = MAX_ATIME_DIFF;
1902 sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
1903 obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1904 if (obd->obd_namespace == NULL) {
1906 GOTO(err_ops, rc = -ENOMEM);
1908 ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
1910 rc = mds_fs_setup(obd, mnt);
1912 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
1917 rc = mds_lov_presetup(mds, lcfg);
1921 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1922 "mds_ldlm_client", &obd->obd_ldlm_client);
1923 obd->obd_replayable = 1;
1925 rc = lquota_setup(quota_interface, obd, lcfg);
1929 mds->mds_group_hash = upcall_cache_init(obd->obd_name);
1930 if (IS_ERR(mds->mds_group_hash)) {
1931 rc = PTR_ERR(mds->mds_group_hash);
1932 mds->mds_group_hash = NULL;
1933 GOTO(err_qctxt, rc);
1936 /* Don't wait for mds_postrecov trying to clear orphans */
1937 obd->obd_async_recov = 1;
1938 rc = mds_postsetup(obd);
1939 obd->obd_async_recov = 0;
1941 GOTO(err_qctxt, rc);
1943 lprocfs_init_vars(mds, &lvars);
1944 lprocfs_obd_setup(obd, lvars.obd_vars);
1946 uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
1947 if (uuid_ptr != NULL) {
1948 class_uuid_unparse(uuid_ptr, &uuid);
1954 label = fsfilt_get_label(obd, obd->u.obt.obt_sb);
1955 if (obd->obd_recovering) {
1956 LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in "
1957 "recovery until %d %s reconnect, or if no clients"
1958 " reconnect for %d:%.02d; during that time new "
1959 "clients will not be allowed to connect. "
1960 "Recovery progress can be monitored by watching "
1961 "/proc/fs/lustre/mds/%s/recovery_status.\n",
1962 obd->obd_name, lustre_cfg_string(lcfg, 1),
1963 label ?: "", label ? "/" : "", str,
1964 obd->obd_recoverable_clients,
1965 (obd->obd_recoverable_clients == 1) ?
1966 "client" : "clients",
1967 (int)(OBD_RECOVERY_TIMEOUT) / 60,
1968 (int)(OBD_RECOVERY_TIMEOUT) % 60,
1971 LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery "
1972 "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
1973 label ?: "", label ? "/" : "", str,
1974 obd->obd_replayable ? "enabled" : "disabled");
1982 lquota_cleanup(quota_interface, obd);
1984 /* No extra cleanup needed for llog_init_commit_thread() */
1985 mds_fs_cleanup(obd);
1986 upcall_cache_cleanup(mds->mds_group_hash);
1987 mds->mds_group_hash = NULL;
1989 ldlm_namespace_free(obd->obd_namespace, 0);
1990 obd->obd_namespace = NULL;
1992 fsfilt_put_ops(obd->obd_fsops);
1995 server_put_mount(obd->obd_name, mds->mds_vfsmnt);
1999 mntput(mds->mds_vfsmnt);
2002 obd->u.obt.obt_sb = NULL;
2006 static int mds_lov_clean(struct obd_device *obd)
2008 struct mds_obd *mds = &obd->u.mds;
2009 struct obd_device *osc = mds->mds_osc_obd;
2012 if (mds->mds_profile) {
2013 class_del_profile(mds->mds_profile);
2014 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
2015 mds->mds_profile = NULL;
2018 /* There better be a lov */
2022 RETURN(PTR_ERR(osc));
2024 obd_register_observer(osc, NULL);
2026 /* Give lov our same shutdown flags */
2027 osc->obd_force = obd->obd_force;
2028 osc->obd_fail = obd->obd_fail;
2030 /* Cleanup the lov */
2031 obd_disconnect(mds->mds_osc_exp);
2032 class_manual_cleanup(osc);
2033 mds->mds_osc_exp = NULL;
2038 static int mds_postsetup(struct obd_device *obd)
2040 struct mds_obd *mds = &obd->u.mds;
2044 rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
2049 rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
2054 if (mds->mds_profile) {
2055 struct lustre_profile *lprof;
2056 /* The profile defines which osc and mdc to connect to, for a
2057 client. We reuse that here to figure out the name of the
2058 lov to use (and ignore lprof->lp_mdc).
2059 The profile was set in the config log with
2060 LCFG_MOUNTOPT profilenm oscnm mdcnm */
2061 lprof = class_get_profile(mds->mds_profile);
2062 if (lprof == NULL) {
2063 CERROR("No profile found: %s\n", mds->mds_profile);
2064 GOTO(err_cleanup, rc = -ENOENT);
2066 rc = mds_lov_connect(obd, lprof->lp_osc);
2068 GOTO(err_cleanup, rc);
2075 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2076 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2080 int mds_postrecov(struct obd_device *obd)
2088 LASSERT(!obd->obd_recovering);
2089 LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
2091 /* FIXME why not put this in the synchronize? */
2092 /* set nextid first, so we are sure it happens */
2093 rc = mds_lov_set_nextid(obd);
2095 CERROR("%s: mds_lov_set_nextid failed %d\n",
2100 /* clean PENDING dir */
2101 if (strcmp(obd->obd_name, MDD_OBD_NAME))
2102 rc = mds_cleanup_pending(obd);
2106 /* FIXME Does target_finish_recovery really need this to block? */
2107 /* Notify the LOV, which will in turn call mds_notify for each tgt */
2108 /* This means that we have to hack obd_notify to think we're obd_set_up
2109 during mds_lov_connect. */
2110 obd_notify(obd->u.mds.mds_osc_obd, NULL,
2111 obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
2112 OBD_NOTIFY_SYNC, NULL);
2114 /* quota recovery */
2115 lquota_recovery(quota_interface, obd);
2121 /* We need to be able to stop an mds_lov_synchronize */
2122 static int mds_lov_early_clean(struct obd_device *obd)
2124 struct mds_obd *mds = &obd->u.mds;
2125 struct obd_device *osc = mds->mds_osc_obd;
2127 if (!osc || (!obd->obd_force && !obd->obd_fail))
2130 CDEBUG(D_HA, "abort inflight\n");
2131 return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
2134 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2140 case OBD_CLEANUP_EARLY:
2142 case OBD_CLEANUP_EXPORTS:
2143 /*XXX Use this for mdd mds cleanup, so comment out
2144 *this target_cleanup_recovery for this tmp MDD MDS
2146 if (strcmp(obd->obd_name, MDD_OBD_NAME))
2147 target_cleanup_recovery(obd);
2148 mds_lov_early_clean(obd);
2150 case OBD_CLEANUP_SELF_EXP:
2151 mds_lov_disconnect(obd);
2153 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2154 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2155 rc = obd_llog_finish(obd, 0);
2157 case OBD_CLEANUP_OBD:
2163 static int mds_cleanup(struct obd_device *obd)
2165 struct mds_obd *mds = &obd->u.mds;
2166 lvfs_sbdev_type save_dev;
2168 int must_relock = 0;
2171 if (obd->u.obt.obt_sb == NULL)
2173 save_dev = lvfs_sbdev(obd->u.obt.obt_sb);
2175 if (mds->mds_osc_exp)
2176 /* lov export was disconnected by mds_lov_clean;
2177 we just need to drop our ref */
2178 class_export_put(mds->mds_osc_exp);
2180 lprocfs_obd_cleanup(obd);
2182 lquota_cleanup(quota_interface, obd);
2184 mds_update_server_data(obd, 1);
2185 if (mds->mds_lov_objids != NULL)
2186 OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
2187 mds_fs_cleanup(obd);
2189 upcall_cache_cleanup(mds->mds_group_hash);
2190 mds->mds_group_hash = NULL;
2192 must_put = server_put_mount(obd->obd_name, mds->mds_vfsmnt);
2193 /* must_put is for old method (l_p_m returns non-0 on err) */
2195 /* We can only unlock kernel if we are in the context of sys_ioctl,
2196 otherwise we never called lock_kernel */
2197 if (ll_kernel_locked()) {
2203 /* In case we didn't mount with lustre_get_mount -- old method*/
2204 mntput(mds->mds_vfsmnt);
2205 obd->u.obt.obt_sb = NULL;
2207 ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
2209 spin_lock_bh(&obd->obd_processing_task_lock);
2210 if (obd->obd_recovering) {
2211 target_cancel_recovery_timer(obd);
2212 obd->obd_recovering = 0;
2214 spin_unlock_bh(&obd->obd_processing_task_lock);
2216 lvfs_clear_rdonly(save_dev);
2221 fsfilt_put_ops(obd->obd_fsops);
2223 LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name);
2228 static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
2229 struct ldlm_lock *new_lock,
2230 struct ldlm_lock **old_lock,
2231 struct lustre_handle *lockh)
2233 struct obd_export *exp = req->rq_export;
2234 struct obd_device *obd = exp->exp_obd;
2235 struct ldlm_request *dlmreq =
2236 lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
2237 struct lustre_handle remote_hdl = dlmreq->lock_handle1;
2238 struct list_head *iter;
2240 if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2243 l_lock(&obd->obd_namespace->ns_lock);
2244 list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2245 struct ldlm_lock *lock;
2246 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2247 if (lock == new_lock)
2249 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2250 lockh->cookie = lock->l_handle.h_cookie;
2251 LDLM_DEBUG(lock, "restoring lock cookie");
2252 DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2255 *old_lock = LDLM_LOCK_GET(lock);
2256 l_unlock(&obd->obd_namespace->ns_lock);
2260 l_unlock(&obd->obd_namespace->ns_lock);
2262 /* If the xid matches, then we know this is a resent request,
2263 * and allow it. (It's probably an OPEN, for which we don't
2266 le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
2269 /* This remote handle isn't enqueued, so we never received or
2270 * processed this request. Clear MSG_RESENT, because it can
2271 * be handled like any normal request now. */
2273 lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2275 DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2279 int intent_disposition(struct ldlm_reply *rep, int flag)
2283 return (rep->lock_policy_res1 & flag);
2286 void intent_set_disposition(struct ldlm_reply *rep, int flag)
2290 rep->lock_policy_res1 |= flag;
2293 static int mds_intent_policy(struct ldlm_namespace *ns,
2294 struct ldlm_lock **lockp, void *req_cookie,
2295 ldlm_mode_t mode, int flags, void *data)
2297 struct ptlrpc_request *req = req_cookie;
2298 struct ldlm_lock *lock = *lockp;
2299 struct ldlm_intent *it;
2300 struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
2301 struct ldlm_reply *rep;
2302 struct lustre_handle lockh = { 0 };
2303 struct ldlm_lock *new_lock = NULL;
2304 int getattr_part = MDS_INODELOCK_UPDATE;
2305 int repsize[4] = {sizeof(*rep),
2306 sizeof(struct mds_body),
2307 mds->mds_max_mdsize};
2308 int repbufcnt = 3, offset = MDS_REQ_INTENT_REC_OFF;
2312 LASSERT(req != NULL);
2314 if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
2315 /* No intent was provided */
2316 int size = sizeof(struct ldlm_reply);
2317 rc = lustre_pack_reply(req, 1, &size, NULL);
2322 it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
2323 lustre_swab_ldlm_intent);
2325 CERROR("Intent missing\n");
2326 RETURN(req->rq_status = -EFAULT);
2329 LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
2331 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
2332 (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP)))
2333 /* we should never allow OBD_CONNECT_ACL if not configured */
2334 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
2335 else if (it->opc & IT_UNLINK)
2336 repsize[repbufcnt++] = mds->mds_max_cookiesize;
2338 rc = lustre_pack_reply(req, repbufcnt, repsize, NULL);
2340 RETURN(req->rq_status = rc);
2342 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
2343 intent_set_disposition(rep, DISP_IT_EXECD);
2346 /* execute policy */
2347 switch ((long)it->opc) {
2349 case IT_CREAT|IT_OPEN:
2350 fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
2351 lock, NULL, &lockh);
2352 /* XXX swab here to assert that an mds_open reint
2353 * packet is following */
2354 rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
2356 /* We abort the lock if the lookup was negative and
2357 * we did not make it to the OPEN portion */
2358 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
2359 RETURN(ELDLM_LOCK_ABORTED);
2360 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
2361 !intent_disposition(rep, DISP_OPEN_OPEN))
2363 RETURN(ELDLM_LOCK_ABORTED);
2366 getattr_part = MDS_INODELOCK_LOOKUP;
2368 getattr_part |= MDS_INODELOCK_LOOKUP;
2370 fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
2371 lock, &new_lock, &lockh);
2373 /* INODEBITS_INTEROP: if this lock was converted from a
2374 * plain lock (client does not support inodebits), then
2375 * child lock must be taken with both lookup and update
2376 * bits set for all operations.
2378 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
2379 getattr_part = MDS_INODELOCK_LOOKUP |
2380 MDS_INODELOCK_UPDATE;
2382 rep->lock_policy_res2 = mds_getattr_name(offset, req,
2383 getattr_part, &lockh);
2384 /* FIXME: LDLM can set req->rq_status. MDS sets
2385 policy_res{1,2} with disposition and status.
2386 - replay: returns 0 & req->status is old status
2387 - otherwise: returns req->status */
2388 if (intent_disposition(rep, DISP_LOOKUP_NEG))
2389 rep->lock_policy_res2 = 0;
2390 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
2391 rep->lock_policy_res2)
2392 RETURN(ELDLM_LOCK_ABORTED);
2393 if (req->rq_status != 0) {
2395 rep->lock_policy_res2 = req->rq_status;
2396 RETURN(ELDLM_LOCK_ABORTED);
2400 CERROR("Unhandled intent "LPD64"\n", it->opc);
2404 /* By this point, whatever function we called above must have either
2405 * filled in 'lockh', been an intent replay, or returned an error. We
2406 * want to allow replayed RPCs to not get a lock, since we would just
2407 * drop it below anyways because lock replay is done separately by the
2408 * client afterwards. For regular RPCs we want to give the new lock to
2409 * the client instead of whatever lock it was about to get. */
2410 if (new_lock == NULL)
2411 new_lock = ldlm_handle2lock(&lockh);
2412 if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2415 LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
2416 it->opc, lockh.cookie);
2418 /* If we've already given this lock to a client once, then we should
2419 * have no readers or writers. Otherwise, we should have one reader
2420 * _or_ writer ref (which will be zeroed below) before returning the
2421 * lock to a client. */
2422 if (new_lock->l_export == req->rq_export) {
2423 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2425 LASSERT(new_lock->l_export == NULL);
2426 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2431 if (new_lock->l_export == req->rq_export) {
2432 /* Already gave this to the client, which means that we
2433 * reconstructed a reply. */
2434 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2436 RETURN(ELDLM_LOCK_REPLACED);
2439 /* Fixup the lock to be given to the client */
2440 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
2441 new_lock->l_readers = 0;
2442 new_lock->l_writers = 0;
2444 new_lock->l_export = class_export_get(req->rq_export);
2445 list_add(&new_lock->l_export_chain,
2446 &new_lock->l_export->exp_ldlm_data.led_held_locks);
2448 new_lock->l_blocking_ast = lock->l_blocking_ast;
2449 new_lock->l_completion_ast = lock->l_completion_ast;
2451 memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2452 sizeof(lock->l_remote_handle));
2454 new_lock->l_flags &= ~LDLM_FL_LOCAL;
2456 LDLM_LOCK_PUT(new_lock);
2457 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
2459 RETURN(ELDLM_LOCK_REPLACED);
2462 static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2464 struct mds_obd *mds = &obd->u.mds;
2465 struct lprocfs_static_vars lvars;
2469 lprocfs_init_vars(mdt, &lvars);
2470 lprocfs_obd_setup(obd, lvars.obd_vars);
2472 sema_init(&mds->mds_health_sem, 1);
2474 if (mds_num_threads < 2)
2475 mds_num_threads = MDT_NUM_THREADS;
2476 if (mds_num_threads > MDT_MAX_THREADS)
2477 mds_num_threads = MDT_MAX_THREADS;
2480 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2481 MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
2482 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2483 mds_handle, LUSTRE_MDS_NAME,
2484 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2486 if (!mds->mds_service) {
2487 CERROR("failed to start service\n");
2488 GOTO(err_lprocfs, rc = -ENOMEM);
2491 rc = ptlrpc_start_threads(obd, mds->mds_service, "ll_mdt");
2493 GOTO(err_thread, rc);
2495 mds->mds_setattr_service =
2496 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2497 MDS_MAXREPSIZE, MDS_SETATTR_PORTAL,
2498 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2499 mds_handle, "mds_setattr",
2500 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2501 if (!mds->mds_setattr_service) {
2502 CERROR("failed to start getattr service\n");
2503 GOTO(err_thread, rc = -ENOMEM);
2506 rc = ptlrpc_start_threads(obd, mds->mds_setattr_service,
2509 GOTO(err_thread2, rc);
2511 mds->mds_readpage_service =
2512 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2513 MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
2514 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2515 mds_handle, "mds_readpage",
2516 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2517 if (!mds->mds_readpage_service) {
2518 CERROR("failed to start readpage service\n");
2519 GOTO(err_thread2, rc = -ENOMEM);
2522 rc = ptlrpc_start_threads(obd, mds->mds_readpage_service,
2526 GOTO(err_thread3, rc);
2528 ping_evictor_start();
2533 ptlrpc_unregister_service(mds->mds_readpage_service);
2534 mds->mds_readpage_service = NULL;
2536 ptlrpc_unregister_service(mds->mds_setattr_service);
2537 mds->mds_setattr_service = NULL;
2539 ptlrpc_unregister_service(mds->mds_service);
2540 mds->mds_service = NULL;
2542 lprocfs_obd_cleanup(obd);
2546 static int mdt_cleanup(struct obd_device *obd)
2548 struct mds_obd *mds = &obd->u.mds;
2551 ping_evictor_stop();
2553 down(&mds->mds_health_sem);
2554 ptlrpc_unregister_service(mds->mds_readpage_service);
2555 ptlrpc_unregister_service(mds->mds_setattr_service);
2556 ptlrpc_unregister_service(mds->mds_service);
2557 mds->mds_readpage_service = NULL;
2558 mds->mds_setattr_service = NULL;
2559 mds->mds_service = NULL;
2560 up(&mds->mds_health_sem);
2562 lprocfs_obd_cleanup(obd);
2567 static int mdt_health_check(struct obd_device *obd)
2569 struct mds_obd *mds = &obd->u.mds;
2572 down(&mds->mds_health_sem);
2573 rc |= ptlrpc_service_health_check(mds->mds_readpage_service);
2574 rc |= ptlrpc_service_health_check(mds->mds_setattr_service);
2575 rc |= ptlrpc_service_health_check(mds->mds_service);
2576 up(&mds->mds_health_sem);
2579 * health_check to return 0 on healthy
2580 * and 1 on unhealthy.
2588 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2591 struct obd_device *obd = data;
2594 fid.generation = gen;
2595 return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2598 static int mds_health_check(struct obd_device *obd)
2600 struct obd_device_target *odt = &obd->u.obt;
2601 struct mds_obd *mds = &obd->u.mds;
2604 if (odt->obt_sb->s_flags & MS_RDONLY)
2607 LASSERT(mds->mds_health_check_filp != NULL);
2608 rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
2613 struct lvfs_callback_ops mds_lvfs_ops = {
2614 l_fid2dentry: mds_lvfs_fid2dentry,
2617 /* use obd ops to offer management infrastructure */
2618 static struct obd_ops mds_obd_ops = {
2619 .o_owner = THIS_MODULE,
2620 .o_connect = mds_connect,
2621 .o_reconnect = mds_reconnect,
2622 .o_init_export = mds_init_export,
2623 .o_destroy_export = mds_destroy_export,
2624 .o_disconnect = mds_disconnect,
2625 .o_setup = mds_setup,
2626 .o_precleanup = mds_precleanup,
2627 .o_cleanup = mds_cleanup,
2628 .o_postrecov = mds_postrecov,
2629 .o_statfs = mds_obd_statfs,
2630 .o_iocontrol = mds_iocontrol,
2631 .o_create = mds_obd_create,
2632 .o_destroy = mds_obd_destroy,
2633 .o_llog_init = mds_llog_init,
2634 .o_llog_finish = mds_llog_finish,
2635 .o_notify = mds_notify,
2636 .o_health_check = mds_health_check,
2639 static struct obd_ops mdt_obd_ops = {
2640 .o_owner = THIS_MODULE,
2641 .o_setup = mdt_setup,
2642 .o_cleanup = mdt_cleanup,
2643 .o_health_check = mdt_health_check,
2646 quota_interface_t *quota_interface;
2647 quota_interface_t mds_quota_interface;
2649 static __attribute__((unused)) int __init mds_init(void)
2652 struct lprocfs_static_vars lvars;
2654 quota_interface = PORTAL_SYMBOL_GET(mds_quota_interface);
2655 rc = lquota_init(quota_interface);
2657 if (quota_interface)
2658 PORTAL_SYMBOL_PUT(mds_quota_interface);
2661 init_obd_quota_ops(quota_interface, &mds_obd_ops);
2663 lprocfs_init_vars(mds, &lvars);
2664 class_register_type(&mds_obd_ops, NULL,
2665 lvars.module_vars, LUSTRE_MDS_NAME, NULL);
2666 lprocfs_init_vars(mdt, &lvars);
2667 class_register_type(&mdt_obd_ops, NULL,
2668 lvars.module_vars, LUSTRE_MDT_NAME, NULL);
2673 static __attribute__((unused)) void /*__exit*/ mds_exit(void)
2675 lquota_exit(quota_interface);
2676 if (quota_interface)
2677 PORTAL_SYMBOL_PUT(mds_quota_interface);
2679 class_unregister_type(LUSTRE_MDS_NAME);
2680 class_unregister_type(LUSTRE_MDT_NAME);
2682 /*mds still need lov setup here*/
2683 static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2685 struct mds_obd *mds = &obd->u.mds;
2686 struct lvfs_run_ctxt saved;
2688 struct vfsmount *mnt;
2689 struct lustre_sb_info *lsi;
2690 struct lustre_mount_info *lmi;
2691 struct dentry *dentry;
2696 CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
2697 if (strcmp(obd->obd_name, MDD_OBD_NAME))
2700 if (lcfg->lcfg_bufcount < 5) {
2701 CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
2704 dev = lustre_cfg_string(lcfg, 4);
2705 lmi = server_get_mount(dev);
2706 LASSERT(lmi != NULL);
2708 lsi = s2lsi(lmi->lmi_sb);
2711 obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
2712 mds_init_ctxt(obd, mnt);
2714 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2715 dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
2716 if (IS_ERR(dentry)) {
2717 rc = PTR_ERR(dentry);
2718 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
2719 GOTO(err_putfs, rc);
2721 mds->mds_objects_dir = dentry;
2723 dentry = lookup_one_len("__iopen__", current->fs->pwd,
2724 strlen("__iopen__"));
2725 if (IS_ERR(dentry)) {
2726 rc = PTR_ERR(dentry);
2727 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
2728 GOTO(err_objects, rc);
2731 mds->mds_fid_de = dentry;
2732 if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
2734 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
2738 /* open and test the lov objd file */
2739 file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
2742 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
2743 GOTO(err_fid, rc = PTR_ERR(file));
2745 mds->mds_lov_objid_filp = file;
2746 if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
2747 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
2748 file->f_dentry->d_inode->i_mode);
2749 GOTO(err_lov_objid, rc = -ENOENT);
2752 rc = mds_lov_presetup(mds, lcfg);
2754 GOTO(err_objects, rc);
2756 /* Don't wait for mds_postrecov trying to clear orphans */
2757 obd->obd_async_recov = 1;
2758 rc = mds_postsetup(obd);
2759 obd->obd_async_recov = 0;
2762 GOTO(err_objects, rc);
2764 mds->mds_max_mdsize = sizeof(struct lov_mds_md);
2765 mds->mds_max_cookiesize = sizeof(struct llog_cookie);
2768 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2771 if (mds->mds_lov_objid_filp &&
2772 filp_close((struct file *)mds->mds_lov_objid_filp, 0))
2773 CERROR("can't close %s after error\n", LOV_OBJID);
2775 dput(mds->mds_fid_de);
2777 dput(mds->mds_objects_dir);
2779 fsfilt_put_ops(obd->obd_fsops);
2783 static int mds_cmd_cleanup(struct obd_device *obd)
2785 struct mds_obd *mds = &obd->u.mds;
2786 struct lvfs_run_ctxt saved;
2791 LCONSOLE_WARN("%s: shutting down for failover; client state "
2792 "will be preserved.\n", obd->obd_name);
2794 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2795 if (mds->mds_lov_objid_filp) {
2796 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
2797 mds->mds_lov_objid_filp = NULL;
2799 CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
2801 if (mds->mds_objects_dir != NULL) {
2802 l_dput(mds->mds_objects_dir);
2803 mds->mds_objects_dir = NULL;
2806 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2807 shrink_dcache_parent(mds->mds_fid_de);
2808 dput(mds->mds_fid_de);
2809 LL_DQUOT_OFF(obd->u.obt.obt_sb);
2810 fsfilt_put_ops(obd->obd_fsops);
2816 static int mds_cmd_health_check(struct obd_device *obd)
2821 static struct obd_ops mds_cmd_obd_ops = {
2822 .o_owner = THIS_MODULE,
2823 .o_setup = mds_cmd_setup,
2824 .o_cleanup = mds_cmd_cleanup,
2825 .o_precleanup = mds_precleanup,
2826 .o_create = mds_obd_create,
2827 .o_destroy = mds_obd_destroy,
2828 .o_llog_init = mds_llog_init,
2829 .o_llog_finish = mds_llog_finish,
2830 .o_notify = mds_notify,
2831 // .o_health_check = mds_cmd_health_check,
2834 static int __init mds_cmd_init(void)
2836 struct lprocfs_static_vars lvars;
2838 lprocfs_init_vars(mds, &lvars);
2839 class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
2840 LUSTRE_MDS_NAME, NULL);
2845 static void /*__exit*/ mds_cmd_exit(void)
2847 class_unregister_type(LUSTRE_MDS_NAME);
2850 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2851 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2852 MODULE_LICENSE("GPL");
2854 module_init(mds_cmd_init);
2855 module_exit(mds_cmd_exit);