1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
14 * This file is part of the Lustre file system, http://www.lustre.org
15 * Lustre is a trademark of Cluster File Systems, Inc.
17 * You may have signed or agreed to another license before downloading
18 * this software. If so, you are bound by the terms and conditions
19 * of that agreement, and the following does not apply to you. See the
20 * LICENSE file included with this distribution for more information.
22 * If you did not agree to a different license, then this copy of Lustre
23 * is open source software; you can redistribute it and/or modify it
24 * under the terms of version 2 of the GNU General Public License as
25 * published by the Free Software Foundation.
27 * In either case, Lustre is distributed in the hope that it will be
28 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * license text for more details.
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <linux/module.h>
43 #include <linux/lustre_ver.h>
45 * struct OBD_{ALLOC,FREE}*()
48 #include <linux/obd_support.h>
50 #include <linux/lu_object.h>
55 * Initialized in mdt_mod_init().
57 unsigned long mdt_num_threads;
59 static int mdt_getstatus(struct mdt_thread_info *info,
60 struct ptlrpc_request *req, int offset)
62 struct md_device *mdd = info->mti_mdt->mdt_child;
63 struct mds_body *body;
64 int size = sizeof *body;
69 result = lustre_pack_reply(req, 1, &size, NULL);
71 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
73 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
76 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
77 result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
80 /* the last_committed and last_xid fields are filled in for all
81 * replies already - no need to do so here also.
89 #include <linux/obd.h>
91 * struct class_connect()
93 #include <linux/obd_class.h>
97 #include <linux/lustre_export.h>
99 * struct mds_client_data
101 #include <../mds/mds_internal.h>
102 #include <linux/lustre_mds.h>
103 #include <linux/lustre_fsfilt.h>
104 #include <linux/lprocfs_status.h>
105 #include <linux/lustre_commit_confd.h>
106 #include <linux/lustre_quota.h>
107 #include <linux/lustre_disk.h>
108 #include <linux/lustre_ver.h>
110 static int mds_intent_policy(struct ldlm_namespace *ns,
111 struct ldlm_lock **lockp, void *req_cookie,
112 ldlm_mode_t mode, int flags, void *data);
113 static int mds_postsetup(struct obd_device *obd);
114 static int mds_cleanup(struct obd_device *obd);
116 /* Assumes caller has already pushed into the kernel filesystem context */
117 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
118 loff_t offset, int count)
120 struct ptlrpc_bulk_desc *desc;
121 struct l_wait_info lwi;
123 int rc = 0, npages, i, tmpcount, tmpsize = 0;
126 LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
128 npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
129 OBD_ALLOC(pages, sizeof(*pages) * npages);
131 GOTO(out, rc = -ENOMEM);
133 desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
136 GOTO(out_free, rc = -ENOMEM);
138 for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
139 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
141 pages[i] = alloc_pages(GFP_KERNEL, 0);
142 if (pages[i] == NULL)
143 GOTO(cleanup_buf, rc = -ENOMEM);
145 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
148 for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
149 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
150 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
151 tmpsize, offset, file->f_dentry->d_inode->i_ino,
152 file->f_dentry->d_inode->i_size);
154 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
155 kmap(pages[i]), tmpsize, &offset);
159 GOTO(cleanup_buf, rc = -EIO);
162 LASSERT(desc->bd_nob == count);
164 rc = ptlrpc_start_bulk_transfer(desc);
166 GOTO(cleanup_buf, rc);
168 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
169 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
170 OBD_FAIL_MDS_SENDPAGE, rc);
171 GOTO(abort_bulk, rc);
174 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
175 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
176 LASSERT (rc == 0 || rc == -ETIMEDOUT);
179 if (desc->bd_success &&
180 desc->bd_nob_transferred == count)
181 GOTO(cleanup_buf, rc);
183 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
186 DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
187 (rc == -ETIMEDOUT) ? "timeout" : "network error",
188 desc->bd_nob_transferred, count,
189 req->rq_export->exp_client_uuid.uuid,
190 req->rq_export->exp_connection->c_remote_uuid.uuid);
192 class_fail_export(req->rq_export);
196 ptlrpc_abort_bulk (desc);
198 for (i = 0; i < npages; i++)
200 __free_pages(pages[i], 0);
202 ptlrpc_free_bulk(desc);
204 OBD_FREE(pages, sizeof(*pages) * npages);
209 /* only valid locked dentries or errors should be returned */
210 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
211 struct vfsmount **mnt, int lock_mode,
212 struct lustre_handle *lockh,
213 char *name, int namelen, __u64 lockpart)
215 struct mds_obd *mds = &obd->u.mds;
216 struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
217 struct ldlm_res_id res_id = { .name = {0} };
219 ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
225 res_id.name[0] = de->d_inode->i_ino;
226 res_id.name[1] = de->d_inode->i_generation;
227 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
228 LDLM_IBITS, &policy, lock_mode, &flags,
229 ldlm_blocking_ast, ldlm_completion_ast,
230 NULL, NULL, NULL, 0, NULL, lockh);
231 if (rc != ELDLM_OK) {
233 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
239 /* Look up an entry by inode number. */
240 /* this function ONLY returns valid dget'd dentries with an initialized inode
242 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
243 struct vfsmount **mnt)
246 unsigned long ino = fid->id;
247 __u32 generation = fid->generation;
249 struct dentry *result;
252 RETURN(ERR_PTR(-ESTALE));
254 snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
256 CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
257 ino, generation, mds->mds_obt.obt_sb);
259 /* under ext3 this is neither supposed to return bad inodes
261 result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
265 inode = result->d_inode;
267 RETURN(ERR_PTR(-ENOENT));
269 if (inode->i_generation == 0 || inode->i_nlink == 0) {
270 LCONSOLE_WARN("Found inode with zero generation or link -- this"
271 " may indicate disk corruption (inode: %lu, link:"
272 " %lu, count: %d)\n", inode->i_ino,
273 (unsigned long)inode->i_nlink,
274 atomic_read(&inode->i_count));
276 RETURN(ERR_PTR(-ENOENT));
279 if (generation && inode->i_generation != generation) {
280 /* we didn't find the right inode.. */
281 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
282 "count: %d, generation %u/%u\n", inode->i_ino,
283 (unsigned long)inode->i_nlink,
284 atomic_read(&inode->i_count), inode->i_generation,
287 RETURN(ERR_PTR(-ENOENT));
291 *mnt = mds->mds_vfsmnt;
298 static int mds_connect_internal(struct obd_export *exp,
299 struct obd_connect_data *data)
301 struct obd_device *obd = exp->exp_obd;
303 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
304 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
306 /* If no known bits (which should not happen, probably,
307 as everybody should support LOOKUP and UPDATE bits at least)
308 revert to compat mode with plain locks. */
309 if (!data->ocd_ibits_known &&
310 data->ocd_connect_flags & OBD_CONNECT_IBITS)
311 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
313 if (!obd->u.mds.mds_fl_acl)
314 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
316 if (!obd->u.mds.mds_fl_user_xattr)
317 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
319 exp->exp_connect_flags = data->ocd_connect_flags;
320 data->ocd_version = LUSTRE_VERSION_CODE;
321 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
324 if (obd->u.mds.mds_fl_acl &&
325 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
326 CWARN("%s: MDS requires ACL support but client does not\n",
333 static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
334 struct obd_uuid *cluuid,
335 struct obd_connect_data *data)
340 if (exp == NULL || obd == NULL || cluuid == NULL)
343 rc = mds_connect_internal(exp, data);
348 /* Establish a connection to the MDS.
350 * This will set up an export structure for the client to hold state data
351 * about that client, like open files, the last operation number it did
352 * on the server, etc.
354 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
355 struct obd_uuid *cluuid, struct obd_connect_data *data)
357 struct obd_export *exp;
358 struct mds_export_data *med;
359 struct mds_client_data *mcd = NULL;
360 int rc, abort_recovery;
363 if (!conn || !obd || !cluuid)
366 /* Check for aborted recovery. */
367 spin_lock_bh(&obd->obd_processing_task_lock);
368 abort_recovery = obd->obd_abort_recovery;
369 spin_unlock_bh(&obd->obd_processing_task_lock);
371 target_abort_recovery(obd);
373 /* XXX There is a small race between checking the list and adding a
374 * new connection for the same UUID, but the real threat (list
375 * corruption when multiple different clients connect) is solved.
377 * There is a second race between adding the export to the list,
378 * and filling in the client data below. Hence skipping the case
379 * of NULL mcd above. We should already be controlling multiple
380 * connects at the client, and we can't hold the spinlock over
381 * memory allocations without risk of deadlocking.
383 rc = class_connect(conn, obd, cluuid);
386 exp = class_conn2export(conn);
388 med = &exp->exp_mds_data;
390 rc = mds_connect_internal(exp, data);
394 OBD_ALLOC(mcd, sizeof(*mcd));
396 GOTO(out, rc = -ENOMEM);
398 memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
401 rc = mds_client_add(obd, &obd->u.mds, med, -1);
407 OBD_FREE(mcd, sizeof(*mcd));
410 class_disconnect(exp);
412 class_export_put(exp);
418 static int mds_init_export(struct obd_export *exp)
420 struct mds_export_data *med = &exp->exp_mds_data;
422 INIT_LIST_HEAD(&med->med_open_head);
423 spin_lock_init(&med->med_open_lock);
427 static int mds_destroy_export(struct obd_export *export)
429 struct mds_export_data *med;
430 struct obd_device *obd = export->exp_obd;
431 struct lvfs_run_ctxt saved;
435 med = &export->exp_mds_data;
436 target_destroy_export(export);
438 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
441 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
442 /* Close any open files (which may also cause orphan unlinking). */
443 spin_lock(&med->med_open_lock);
444 while (!list_empty(&med->med_open_head)) {
445 struct list_head *tmp = med->med_open_head.next;
446 struct mds_file_data *mfd =
447 list_entry(tmp, struct mds_file_data, mfd_list);
448 struct dentry *dentry = mfd->mfd_dentry;
450 /* Remove mfd handle so it can't be found again.
451 * We are consuming the mfd_list reference here. */
452 mds_mfd_unlink(mfd, 0);
453 spin_unlock(&med->med_open_lock);
455 /* If you change this message, be sure to update
456 * replay_single:test_46 */
457 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
458 "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len,
459 dentry->d_name.name, dentry->d_inode->i_ino);
460 /* child orphan sem protects orphan_dec_test and
461 * is_orphan race, mds_mfd_close drops it */
462 MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
463 rc = mds_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd,
464 !(export->exp_flags & OBD_OPT_FAILOVER));
467 CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
468 spin_lock(&med->med_open_lock);
470 spin_unlock(&med->med_open_lock);
471 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
473 mds_client_free(export);
478 static int mds_disconnect(struct obd_export *exp)
480 unsigned long irqflags;
485 class_export_get(exp);
487 /* Disconnect early so that clients can't keep using export */
488 rc = class_disconnect(exp);
489 ldlm_cancel_locks_for_export(exp);
491 /* complete all outstanding replies */
492 spin_lock_irqsave(&exp->exp_lock, irqflags);
493 while (!list_empty(&exp->exp_outstanding_replies)) {
494 struct ptlrpc_reply_state *rs =
495 list_entry(exp->exp_outstanding_replies.next,
496 struct ptlrpc_reply_state, rs_exp_list);
497 struct ptlrpc_service *svc = rs->rs_service;
499 spin_lock(&svc->srv_lock);
500 list_del_init(&rs->rs_exp_list);
501 ptlrpc_schedule_difficult_reply(rs);
502 spin_unlock(&svc->srv_lock);
504 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
506 class_export_put(exp);
510 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
518 rc = fsfilt_get_md(obd, inode, md, *size, "lov");
521 CERROR("Error %d reading eadata for ino %lu\n",
525 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
543 /* Call with lock=1 if you want mds_pack_md to take the i_sem.
544 * Call with lock=0 if the caller has already taken the i_sem. */
545 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
546 struct mds_body *body, struct inode *inode, int lock)
548 struct mds_obd *mds = &obd->u.mds;
554 lmm = lustre_msg_buf(msg, offset, 0);
556 /* Some problem with getting eadata when I sized the reply
558 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
562 lmm_size = msg->buflens[offset];
564 /* I don't really like this, but it is a sanity check on the client
565 * MD request. However, if the client doesn't know how much space
566 * to reserve for the MD, it shouldn't be bad to have too much space.
568 if (lmm_size > mds->mds_max_mdsize) {
569 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
570 inode->i_ino, lmm_size, mds->mds_max_mdsize);
574 rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
576 if (S_ISDIR(inode->i_mode))
577 body->valid |= OBD_MD_FLDIREA;
579 body->valid |= OBD_MD_FLEASIZE;
580 body->eadatasize = lmm_size;
587 #ifdef CONFIG_FS_POSIX_ACL
589 int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
590 struct mds_body *repbody, int repoff)
592 struct dentry de = { .d_inode = inode };
596 LASSERT(repbody->aclsize == 0);
597 LASSERT(repmsg->bufcount > repoff);
599 buflen = lustre_msg_buflen(repmsg, repoff);
603 if (!inode->i_op || !inode->i_op->getxattr)
607 rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS,
608 lustre_msg_buf(repmsg, repoff, buflen),
613 repbody->aclsize = rc;
614 else if (rc != -ENODATA) {
615 CERROR("buflen %d, get acl: %d\n", buflen, rc);
620 repbody->valid |= OBD_MD_FLACL;
624 #define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0
627 int mds_pack_acl(struct mds_export_data *med, struct inode *inode,
628 struct lustre_msg *repmsg, struct mds_body *repbody,
631 return mds_pack_posix_acl(inode, repmsg, repbody, repoff);
634 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
635 struct ptlrpc_request *req,
636 struct mds_body *reqbody, int reply_off)
638 struct mds_body *body;
639 struct inode *inode = dentry->d_inode;
646 body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
647 LASSERT(body != NULL); /* caller prepped reply */
649 mds_pack_inode2fid(&body->fid1, inode);
650 mds_pack_inode2body(body, inode);
653 if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
654 (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
655 rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body,
658 /* If we have LOV EA data, the OST holds size, atime, mtime */
659 if (!(body->valid & OBD_MD_FLEASIZE) &&
660 !(body->valid & OBD_MD_FLDIREA))
661 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
662 OBD_MD_FLATIME | OBD_MD_FLMTIME);
664 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
665 if (body->eadatasize)
667 } else if (S_ISLNK(inode->i_mode) &&
668 (reqbody->valid & OBD_MD_LINKNAME) != 0) {
669 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
672 LASSERT (symname != NULL); /* caller prepped reply */
673 len = req->rq_repmsg->buflens[reply_off];
675 rc = inode->i_op->readlink(dentry, symname, len);
677 CERROR("readlink failed: %d\n", rc);
678 } else if (rc != len - 1) {
679 CERROR ("Unexpected readlink rc %d: expecting %d\n",
683 CDEBUG(D_INODE, "read symlink dest %s\n", symname);
684 body->valid |= OBD_MD_LINKNAME;
685 body->eadatasize = rc + 1;
686 symname[rc] = 0; /* NULL terminate */
692 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
693 struct mds_obd *mds = mds_req2mds(req);
694 body->max_cookiesize = mds->mds_max_cookiesize;
695 body->max_mdsize = mds->mds_max_mdsize;
696 body->valid |= OBD_MD_FLMODEASIZE;
702 #ifdef CONFIG_FS_POSIX_ACL
703 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
704 (reqbody->valid & OBD_MD_FLACL)) {
705 rc = mds_pack_acl(&req->rq_export->exp_mds_data,
706 inode, req->rq_repmsg,
709 lustre_shrink_reply(req, reply_off, body->aclsize, 0);
718 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
721 struct mds_obd *mds = mds_req2mds(req);
722 struct mds_body *body;
723 int rc, size[2] = {sizeof(*body)}, bufcount = 1;
726 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
727 LASSERT(body != NULL); /* checked by caller */
728 LASSERT_REQSWABBED(req, offset); /* swabbed by caller */
730 if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
731 (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
733 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
736 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
739 if (rc != -ENODATA) {
740 CERROR("error getting inode %lu MD: rc = %d\n",
745 } else if (rc > mds->mds_max_mdsize) {
747 CERROR("MD size %d larger than maximum possible %u\n",
748 rc, mds->mds_max_mdsize);
753 } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
754 if (inode->i_size + 1 != body->eadatasize)
755 CERROR("symlink size: %Lu, reply space: %d\n",
756 inode->i_size + 1, body->eadatasize);
757 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
759 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
760 inode->i_size + 1, body->eadatasize);
763 #ifdef CONFIG_FS_POSIX_ACL
764 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
765 (body->valid & OBD_MD_FLACL)) {
766 struct dentry de = { .d_inode = inode };
769 if (inode->i_op && inode->i_op->getxattr) {
771 rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS,
776 if (rc != -ENODATA) {
777 CERROR("got acl size: %d\n", rc);
787 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
788 CERROR("failed MDS_GETATTR_PACK test\n");
789 req->rq_status = -ENOMEM;
793 rc = lustre_pack_reply(req, bufcount, size, NULL);
795 CERROR("lustre_pack_reply failed: rc %d\n", rc);
803 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
804 int child_part, struct lustre_handle *child_lockh)
806 struct obd_device *obd = req->rq_export->exp_obd;
807 struct mds_obd *mds = &obd->u.mds;
808 struct ldlm_reply *rep = NULL;
809 struct lvfs_run_ctxt saved;
810 struct mds_body *body;
811 struct dentry *dparent = NULL, *dchild = NULL;
812 struct lvfs_ucred uc = {NULL,};
813 struct lustre_handle parent_lockh;
815 int rc = 0, cleanup_phase = 0, resent_req = 0;
819 LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
821 /* Swab now, before anyone looks inside the request */
823 body = lustre_swab_reqbuf(req, offset, sizeof(*body),
824 lustre_swab_mds_body);
826 CERROR("Can't swab mds_body\n");
830 LASSERT_REQSWAB(req, offset + 1);
831 name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
833 CERROR("Can't unpack name\n");
836 namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
838 rc = mds_init_ucred(&uc, req, offset);
842 LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF);
843 /* if requests were at offset 2, the getattr reply goes back at 1 */
844 if (offset == MDS_REQ_INTENT_REC_OFF) {
845 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
849 push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
850 cleanup_phase = 1; /* kernel context */
851 intent_set_disposition(rep, DISP_LOOKUP_EXECD);
853 /* FIXME: handle raw lookup */
855 if (body->valid == OBD_MD_FLID) {
856 struct mds_body *mds_reply;
857 int size = sizeof(*mds_reply);
859 // The user requested ONLY the inode number, so do a raw lookup
860 rc = lustre_pack_reply(req, 1, &size, NULL);
862 CERROR("out of memory\n");
866 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
868 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
870 mds_reply->fid1.id = inum;
871 mds_reply->valid = OBD_MD_FLID;
876 if (lustre_handle_is_used(child_lockh)) {
877 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
881 if (resent_req == 0) {
883 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
884 &parent_lockh, &dparent,
886 MDS_INODELOCK_UPDATE,
888 child_lockh, &dchild, LCK_CR,
891 /* For revalidate by fid we always take UPDATE lock */
892 dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
895 MDS_INODELOCK_UPDATE);
898 rc = PTR_ERR(dchild);
903 struct ldlm_lock *granted_lock;
904 struct ll_fid child_fid;
905 struct ldlm_resource *res;
906 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
907 granted_lock = ldlm_handle2lock(child_lockh);
908 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
909 body->fid1.id, body->fid1.generation,
910 child_lockh->cookie);
913 res = granted_lock->l_resource;
914 child_fid.id = res->lr_name.name[0];
915 child_fid.generation = res->lr_name.name[1];
916 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
917 LASSERT(!IS_ERR(dchild));
918 LDLM_LOCK_PUT(granted_lock);
921 cleanup_phase = 2; /* dchild, dparent, locks */
923 if (dchild->d_inode == NULL) {
924 intent_set_disposition(rep, DISP_LOOKUP_NEG);
925 /* in the intent case, the policy clears this error:
926 the disposition is enough */
927 GOTO(cleanup, rc = -ENOENT);
929 intent_set_disposition(rep, DISP_LOOKUP_POS);
932 if (req->rq_repmsg == NULL) {
933 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
935 CERROR ("mds_getattr_pack_msg: %d\n", rc);
940 rc = mds_getattr_internal(obd, dchild, req, body, offset);
941 GOTO(cleanup, rc); /* returns the lock to the client */
944 switch (cleanup_phase) {
946 if (resent_req == 0) {
947 if (rc && dchild->d_inode)
948 ldlm_lock_decref(child_lockh, LCK_CR);
949 ldlm_lock_decref(&parent_lockh, LCK_CR);
954 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
956 mds_exit_ucred(&uc, mds);
957 if (req->rq_reply_state == NULL) {
959 lustre_pack_reply(req, 0, NULL, NULL);
965 static int mds_getattr(struct ptlrpc_request *req, int offset)
967 struct mds_obd *mds = mds_req2mds(req);
968 struct obd_device *obd = req->rq_export->exp_obd;
969 struct lvfs_run_ctxt saved;
971 struct mds_body *body;
972 struct lvfs_ucred uc = {NULL,};
976 body = lustre_swab_reqbuf(req, offset, sizeof(*body),
977 lustre_swab_mds_body);
981 rc = mds_init_ucred(&uc, req, offset);
985 push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
986 de = mds_fid2dentry(mds, &body->fid1, NULL);
988 rc = req->rq_status = PTR_ERR(de);
992 rc = mds_getattr_pack_msg(req, de->d_inode, offset);
994 CERROR("mds_getattr_pack_msg: %d\n", rc);
998 req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
1003 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1005 if (req->rq_reply_state == NULL) {
1006 req->rq_status = rc;
1007 lustre_pack_reply(req, 0, NULL, NULL);
1009 mds_exit_ucred(&uc, mds);
1013 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1014 unsigned long max_age)
1018 spin_lock(&obd->obd_osfs_lock);
1019 rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
1021 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1022 spin_unlock(&obd->obd_osfs_lock);
1027 static int mds_statfs(struct ptlrpc_request *req)
1029 struct obd_device *obd = req->rq_export->exp_obd;
1030 int rc, size = sizeof(struct obd_statfs);
1033 /* This will trigger a watchdog timeout */
1034 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1035 (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
1037 rc = lustre_pack_reply(req, 1, &size, NULL);
1038 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1039 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1043 /* We call this so that we can cache a bit - 1 jiffie worth */
1044 rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
1047 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1053 req->rq_status = rc;
1057 static int mds_sync(struct ptlrpc_request *req, int offset)
1059 struct obd_device *obd = req->rq_export->exp_obd;
1060 struct mds_obd *mds = &obd->u.mds;
1061 struct mds_body *body;
1062 int rc, size = sizeof(*body);
1065 body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
1067 GOTO(out, rc = -EFAULT);
1069 rc = lustre_pack_reply(req, 1, &size, NULL);
1070 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1071 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1075 if (body->fid1.id == 0) {
1076 /* a fid of zero is taken to mean "sync whole filesystem" */
1077 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
1082 de = mds_fid2dentry(mds, &body->fid1, NULL);
1084 GOTO(out, rc = PTR_ERR(de));
1086 /* The file parameter isn't used for anything */
1087 if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
1088 rc = de->d_inode->i_fop->fsync(NULL, de, 1);
1090 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
1091 mds_pack_inode2fid(&body->fid1, de->d_inode);
1092 mds_pack_inode2body(body, de->d_inode);
1099 req->rq_status = rc;
1103 /* mds_readpage does not take a DLM lock on the inode, because the client must
1104 * already have a PR lock.
1106 * If we were to take another one here, a deadlock will result, if another
1107 * thread is already waiting for a PW lock. */
1108 static int mds_readpage(struct ptlrpc_request *req, int offset)
1110 struct obd_device *obd = req->rq_export->exp_obd;
1111 struct mds_obd *mds = &obd->u.mds;
1112 struct vfsmount *mnt;
1115 struct mds_body *body, *repbody;
1116 struct lvfs_run_ctxt saved;
1117 int rc, size = sizeof(*repbody);
1118 struct lvfs_ucred uc = {NULL,};
1121 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1124 rc = lustre_pack_reply(req, 1, &size, NULL);
1126 CERROR("error packing readpage reply: rc %d\n", rc);
1130 body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1131 lustre_swab_mds_body);
1133 GOTO (out, rc = -EFAULT);
1135 rc = mds_init_ucred(&uc, req, 0);
1139 push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1140 de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1142 GOTO(out_pop, rc = PTR_ERR(de));
1144 CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1146 file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1147 /* note: in case of an error, dentry_open puts dentry */
1149 GOTO(out_pop, rc = PTR_ERR(file));
1151 /* body->size is actually the offset -eeb */
1152 if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
1153 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1154 body->size, de->d_inode->i_blksize);
1155 GOTO(out_file, rc = -EFAULT);
1158 /* body->nlink is actually the #bytes to read -eeb */
1159 if (body->nlink & (de->d_inode->i_blksize - 1)) {
1160 CERROR("size %u is not multiple of blocksize %lu\n",
1161 body->nlink, de->d_inode->i_blksize);
1162 GOTO(out_file, rc = -EFAULT);
1165 repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1166 repbody->size = file->f_dentry->d_inode->i_size;
1167 repbody->valid = OBD_MD_FLSIZE;
1169 /* to make this asynchronous make sure that the handling function
1170 doesn't send a reply when this function completes. Instead a
1171 callback function would send the reply */
1172 /* body->size is actually the offset -eeb */
1173 rc = mds_sendpage(req, file, body->size, body->nlink);
1176 filp_close(file, 0);
1178 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1180 mds_exit_ucred(&uc, mds);
1181 req->rq_status = rc;
1185 int mds_reint(struct ptlrpc_request *req, int offset,
1186 struct lustre_handle *lockh)
1188 struct mds_update_record *rec; /* 116 bytes on the stack? no sir! */
1191 OBD_ALLOC(rec, sizeof(*rec));
1195 rc = mds_update_unpack(req, offset, rec);
1196 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1197 CERROR("invalid record\n");
1198 GOTO(out, req->rq_status = -EINVAL);
1201 /* rc will be used to interrupt a for loop over multiple records */
1202 rc = mds_reint_rec(rec, offset, req, lockh);
1204 OBD_FREE(rec, sizeof(*rec));
1208 static int mds_filter_recovery_request(struct ptlrpc_request *req,
1209 struct obd_device *obd, int *process)
1211 switch (req->rq_reqmsg->opc) {
1212 case MDS_CONNECT: /* This will never get here, but for completeness. */
1213 case OST_CONNECT: /* This will never get here, but for completeness. */
1214 case MDS_DISCONNECT:
1215 case OST_DISCONNECT:
1220 case MDS_SYNC: /* used in unmounting */
1224 *process = target_queue_recovery_request(req, obd);
1228 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1230 /* XXX what should we set rq_status to here? */
1231 req->rq_status = -EAGAIN;
1232 RETURN(ptlrpc_error(req));
1236 static char *reint_names[] = {
1237 [REINT_SETATTR] "setattr",
1238 [REINT_CREATE] "create",
1239 [REINT_LINK] "link",
1240 [REINT_UNLINK] "unlink",
1241 [REINT_RENAME] "rename",
1242 [REINT_OPEN] "open",
1245 static int mds_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1252 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1254 DEBUG_REQ(D_HA, req, "no set_info key");
1257 keylen = req->rq_reqmsg->buflens[0];
1259 val = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*val));
1261 DEBUG_REQ(D_HA, req, "no set_info val");
1265 rc = lustre_pack_reply(req, 0, NULL, NULL);
1268 req->rq_repmsg->status = 0;
1270 if (keylen < strlen("read-only") ||
1271 memcmp(key, "read-only", keylen) != 0)
1275 exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1277 exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1282 static int mds_handle_quotacheck(struct ptlrpc_request *req)
1284 struct obd_quotactl *oqctl;
1288 oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl),
1289 lustre_swab_obd_quotactl);
1293 rc = lustre_pack_reply(req, 0, NULL, NULL);
1295 CERROR("mds: out of memory while packing quotacheck reply\n");
1299 req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1303 static int mds_handle_quotactl(struct ptlrpc_request *req)
1305 struct obd_quotactl *oqctl, *repoqc;
1306 int rc, size = sizeof(*repoqc);
1309 oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl),
1310 lustre_swab_obd_quotactl);
1314 rc = lustre_pack_reply(req, 1, &size, NULL);
1318 repoqc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repoqc));
1320 req->rq_status = obd_quotactl(req->rq_export, oqctl);
1325 static int mds_msg_check_version(struct lustre_msg *msg)
1329 /* TODO: enable the below check while really introducing msg version.
1330 * it's disabled because it will break compatibility with b1_4.
1336 case MDS_DISCONNECT:
1338 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1340 CERROR("bad opc %u version %08x, expecting %08x\n",
1341 msg->opc, msg->version, LUSTRE_OBD_VERSION);
1345 case MDS_GETATTR_NAME:
1350 case MDS_DONE_WRITING:
1356 case MDS_QUOTACHECK:
1360 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1362 CERROR("bad opc %u version %08x, expecting %08x\n",
1363 msg->opc, msg->version, LUSTRE_MDS_VERSION);
1367 case LDLM_BL_CALLBACK:
1368 case LDLM_CP_CALLBACK:
1369 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1371 CERROR("bad opc %u version %08x, expecting %08x\n",
1372 msg->opc, msg->version, LUSTRE_DLM_VERSION);
1374 case OBD_LOG_CANCEL:
1375 case LLOG_ORIGIN_HANDLE_CREATE:
1376 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1377 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1378 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1379 case LLOG_ORIGIN_HANDLE_CLOSE:
1381 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1383 CERROR("bad opc %u version %08x, expecting %08x\n",
1384 msg->opc, msg->version, LUSTRE_LOG_VERSION);
1387 CERROR("MDS unknown opcode %d\n", msg->opc);
1394 enum mdt_handler_flags {
1396 * struct mds_body is passed in the 0-th incoming buffer.
1398 HABEO_CORPUS = (1 << 0)
1401 struct mdt_handler {
1402 const char *mh_name;
1406 int (*mh_act)(struct mdt_thread_info *info,
1407 struct ptlrpc_request *req, int offset);
1410 #define DEF_HNDL(prefix, base, flags, opc, fn) \
1411 [prefix ## _ ## opc - prefix ## _ ## base] = { \
1413 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \
1414 .mh_opc = prefix ## _ ## opc, \
1415 .mh_flags = flags, \
1419 #define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn)
1421 static struct mdt_handler mdt_mds_ops[] = {
1422 DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus),
1424 DEF_MDT_HNDL(0, CONNECT, mds_connect),
1425 DEF_MDT_HNDL(0, DISCONNECT, mds_disconnect),
1426 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mds_getattr),
1427 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mds_getattr_name),
1428 DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mds_setxattr),
1429 DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mds_getxattr),
1430 DEF_MDT_HNDL(0, STATFS, mds_statfs),
1431 DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mds_readpage),
1432 DEF_MDT_HNDL(0, REINT, mds_reint),
1433 DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mds_close),
1434 DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mds_done_writing),
1435 DEF_MDT_HNDL(0, PIN, mds_pin),
1436 DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mds_sync),
1437 DEF_MDT_HNDL(0, SET_INFO, mds_set_info),
1438 DEF_MDT_HNDL(0, QUOTACHECK, mds_handle_quotacheck),
1439 DEF_MDT_HNDL(0, QUOTACTL, mds_handle_quotactl)
1442 static struct mdt_handler mdt_obd_ops[] = {
1445 static struct mdt_handler mdt_dlm_ops[] = {
1448 static struct mdt_handler mdt_llog_ops[] = {
1451 static struct mdt_opc_slice {
1452 __u32 mos_opc_start;
1454 struct mdt_handler *mos_hs;
1455 } mdt_handlers[] = {
1457 .mos_opc_start = MDS_GETATTR,
1458 .mos_opc_end = MDS_LAST_OPC,
1459 .mos_hs = mdt_mds_ops
1462 .mos_opc_start = OBD_PING,
1463 .mos_opc_end = OBD_LAST_OPC,
1464 .mos_hs = mdt_obd_ops
1467 .mos_opc_start = LDLM_ENQUEUE,
1468 .mos_opc_end = LDLM_LAST_OPC,
1469 .mos_hs = mdt_dlm_ops
1472 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1473 .mos_opc_end = LLOG_LAST_OPC,
1474 .mos_hs = mdt_llog_ops
1478 struct mdt_handler *mdt_handler_find(__u32 opc)
1481 struct mdt_opc_slice *s;
1482 struct mdt_handler *h;
1485 for (i = 0, s = mdt_handlers; i < ARRAY_SIZE(mdt_handlers); i++, s++) {
1486 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1487 h = s->mos_hs + (opc - s->mos_opc_start);
1489 LASSERT(h->mh_opc == opc);
1491 h = NULL; /* unsupported opc */
1498 struct mdt_object *mdt_object_find(struct mdt_device *d, struct ll_fid *f)
1500 struct lu_object *o;
1502 o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
1504 return (struct mdt_object *)o;
1506 return container_of(o, struct mdt_object, mot_obj.mo_lu);
1509 void mdt_object_put(struct mdt_object *o)
1511 lu_object_put(&o->mot_obj.mo_lu);
1514 static int mdt_req_handle(struct mdt_thread_info *info,
1515 struct mdt_handler *h, struct ptlrpc_request *req,
1523 LASSERT(h->mh_act != NULL);
1524 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
1526 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1528 if (h->mh_fail_id != 0)
1529 OBD_FAIL_RETURN(h->mh_fail_id, 0);
1531 off = MDS_REQ_REC_OFF + shift;
1533 if (h->mh_flags & HABEO_CORPUS) {
1534 info->mti_body = lustre_swab_reqbuf(req, off,
1535 sizeof *info->mti_body,
1536 lustre_swab_mds_body);
1537 if (info->mti_body == NULL) {
1538 CERROR("Can't unpack body\n");
1539 result = req->rq_status = -EFAULT;
1541 info->mti_object = mdt_object_find(info->mti_mdt,
1542 &info->mti_body->fid1);
1543 if (IS_ERR(info->mti_object))
1544 result = PTR_ERR(info->mti_object);
1547 result = h->mh_act(info, req, off);
1549 * XXX result value is unconditionally shoved into ->rq_status
1550 * (original code sometimes placed error code into ->rq_status, and
1551 * sometimes returned it to the
1552 * caller). ptlrpc_server_handle_request() doesn't check return value
1555 req->rq_status = result;
1559 static void mdt_thread_info_init(struct mdt_thread_info *info)
1561 memset(info, 0, sizeof *info);
1562 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1564 * Poison size array.
1566 for (info->mti_rep_buf_nr = 0;
1567 info->mti_rep_buf_nr < MDT_REP_BUF_NR_MAX; info->mti_rep_buf_nr++)
1568 info->mti_rep_buf_size[info->mti_rep_buf_nr] = ~0;
1571 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1573 if (info->mti_object != NULL) {
1574 mdt_object_put(info->mti_object);
1575 info->mti_object = NULL;
1579 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
1582 struct mds_obd *mds = NULL; /* quell gcc overwarning */
1583 struct obd_device *obd = NULL;
1584 struct mdt_handler *h;
1588 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1590 LASSERT(current->journal_info == NULL);
1592 rc = mds_msg_check_version(req->rq_reqmsg);
1594 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
1598 /* XXX identical to OST */
1599 if (req->rq_reqmsg->opc != MDS_CONNECT) {
1600 struct mds_export_data *med;
1601 int recovering, abort_recovery;
1603 if (req->rq_export == NULL) {
1604 CERROR("operation %d on unconnected MDS from %s\n",
1605 req->rq_reqmsg->opc,
1606 libcfs_id2str(req->rq_peer));
1607 req->rq_status = -ENOTCONN;
1608 GOTO(out, rc = -ENOTCONN);
1611 med = &req->rq_export->exp_mds_data;
1612 obd = req->rq_export->exp_obd;
1615 /* sanity check: if the xid matches, the request must
1616 * be marked as a resent or replayed */
1617 if (req->rq_xid == med->med_mcd->mcd_last_xid)
1618 LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
1619 (MSG_RESENT | MSG_REPLAY),
1620 "rq_xid "LPU64" matches last_xid, "
1621 "expected RESENT flag\n",
1623 /* else: note the opposite is not always true; a
1624 * RESENT req after a failover will usually not match
1625 * the last_xid, since it was likely never
1626 * committed. A REPLAYed request will almost never
1627 * match the last xid, however it could for a
1628 * committed, but still retained, open. */
1630 /* Check for aborted recovery. */
1631 spin_lock_bh(&obd->obd_processing_task_lock);
1632 abort_recovery = obd->obd_abort_recovery;
1633 recovering = obd->obd_recovering;
1634 spin_unlock_bh(&obd->obd_processing_task_lock);
1635 if (abort_recovery) {
1636 target_abort_recovery(obd);
1637 } else if (recovering) {
1640 rc = mds_filter_recovery_request(req, obd,
1642 if (rc || !should_process)
1647 h = mdt_handler_find(req->rq_reqmsg->opc);
1649 rc = mdt_req_handle(info, h, req, 0);
1651 req->rq_status = -ENOTSUPP;
1652 rc = ptlrpc_error(req);
1656 LASSERT(current->journal_info == NULL);
1658 /* If we're DISCONNECTing, the mds_export_data is already freed */
1659 if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1660 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1661 req->rq_repmsg->last_xid =
1662 le64_to_cpu(med->med_mcd->mcd_last_xid);
1664 target_committed_to_req(req);
1670 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1671 if (obd && obd->obd_recovering) {
1672 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1673 RETURN(target_queue_final_reply(req, rc));
1675 /* Lost a race with recovery; let the error path DTRT. */
1676 rc = req->rq_status = -ENOTCONN;
1679 target_send_reply(req, rc, info->mti_fail_id);
1683 static struct lu_device_operations mdt_lu_ops;
1685 static int lu_device_is_mdt(struct lu_device *d)
1688 * XXX for now. Tags in lu_device_type->ldt_something are needed.
1690 return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
1693 static struct mdt_object *mdt_obj(struct lu_object *o)
1695 LASSERT(lu_device_is_mdt(o->lo_dev));
1696 return container_of(o, struct mdt_object, mot_obj.mo_lu);
1699 static struct mdt_device *mdt_dev(struct lu_device *d)
1701 LASSERT(lu_device_is_mdt(d));
1702 return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
1705 int mdt_handle(struct ptlrpc_request *req)
1709 struct mdt_thread_info info; /* XXX on stack for now */
1710 mdt_thread_info_init(&info);
1711 info.mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1713 result = mdt_handle0(req, &info);
1715 mdt_thread_info_fini(&info);
1719 static int mdt_intent_policy(struct ldlm_namespace *ns,
1720 struct ldlm_lock **lockp, void *req_cookie,
1721 ldlm_mode_t mode, int flags, void *data)
1723 RETURN(ELDLM_LOCK_ABORTED);
1726 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
1727 svc_handler_t h, char *name,
1728 struct proc_dir_entry *proc_entry,
1729 svcreq_printfn_t prntfn)
1731 return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
1732 c->psc_max_req_size, c->psc_max_reply_size,
1733 c->psc_req_portal, c->psc_rep_portal,
1734 c->psc_watchdog_timeout,
1735 h, name, proc_entry,
1736 prntfn, c->psc_num_threads);
1739 int md_device_init(struct md_device *md, struct lu_device_type *t)
1741 return lu_device_init(&md->md_lu_dev, t);
1744 void md_device_fini(struct md_device *md)
1746 lu_device_fini(&md->md_lu_dev);
1749 static void mdt_fini(struct lu_device *d)
1751 struct mdt_device *m = mdt_dev(d);
1753 if (d->ld_site != NULL) {
1754 lu_site_fini(d->ld_site);
1757 if (m->mdt_service != NULL) {
1758 ptlrpc_unregister_service(m->mdt_service);
1759 m->mdt_service = NULL;
1761 if (m->mdt_namespace != NULL) {
1762 ldlm_namespace_free(m->mdt_namespace, 0);
1763 m->mdt_namespace = NULL;
1766 LASSERT(atomic_read(&d->ld_ref) == 0);
1767 md_device_fini(&m->mdt_md_dev);
1770 static int mdt_init0(struct mdt_device *m,
1771 struct lu_device_type *t, struct lustre_cfg *cfg)
1782 md_device_init(&m->mdt_md_dev, t);
1784 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1786 m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
1787 m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
1788 m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
1789 m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
1790 m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
1791 m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
1792 m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1794 * We'd like to have a mechanism to set this on a per-device basis,
1797 m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1800 lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1802 snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1803 m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1804 if (m->mdt_namespace == NULL)
1806 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1808 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1809 "mdt_ldlm_client", &m->mdt_ldlm_client);
1812 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1814 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1816 if (m->mdt_service == NULL)
1819 return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1822 struct lu_object *mdt_object_alloc(struct lu_device *d)
1824 struct mdt_object *mo;
1828 struct lu_object *o;
1829 struct lu_object_header *h;
1831 o = &mo->mot_obj.mo_lu;
1832 h = &mo->mot_header;
1833 lu_object_header_init(h);
1834 lu_object_init(o, h, d);
1835 /* ->lo_depth and ->lo_flags are automatically 0 */
1836 lu_object_add_top(h, o);
1842 int mdt_object_init(struct lu_object *o)
1844 struct mdt_device *d = mdt_dev(o->lo_dev);
1845 struct lu_device *under;
1846 struct lu_object *below;
1848 under = &d->mdt_child->md_lu_dev;
1849 below = under->ld_ops->ldo_object_alloc(under);
1850 if (below != NULL) {
1851 lu_object_add(o, below);
1857 void mdt_object_free(struct lu_object *o)
1859 struct lu_object_header *h;
1863 lu_object_header_fini(h);
1866 void mdt_object_release(struct lu_object *o)
1870 int mdt_object_print(struct seq_file *f, const struct lu_object *o)
1872 return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1875 static struct lu_device_operations mdt_lu_ops = {
1876 .ldo_object_alloc = mdt_object_alloc,
1877 .ldo_object_init = mdt_object_init,
1878 .ldo_object_free = mdt_object_free,
1879 .ldo_object_release = mdt_object_release,
1880 .ldo_object_print = mdt_object_print
1883 static struct ll_fid *mdt_object_fid(struct mdt_object *o)
1885 return lu_object_fid(&o->mot_obj.mo_lu);
1888 static int mdt_object_lock(struct mdt_object *o, ldlm_mode_t mode)
1890 return fid_lock(mdt_object_fid(o), &o->mot_lh, mode);
1893 static void mdt_object_unlock(struct mdt_object *o, ldlm_mode_t mode)
1895 fid_unlock(mdt_object_fid(o), &o->mot_lh, mode);
1898 int mdt_mkdir(struct mdt_device *d, struct ll_fid *pfid, const char *name)
1900 struct mdt_object *o;
1903 o = mdt_object_find(d, pfid);
1906 result = mdt_object_lock(o, LCK_PW);
1908 result = d->mdt_child->md_ops->mdo_mkdir(&o->mot_obj, name);
1909 mdt_object_unlock(o, LCK_PW);
1915 static struct obd_ops mdt_obd_device_ops = {
1916 .o_owner = THIS_MODULE
1919 struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1920 struct lustre_cfg *cfg)
1922 struct lu_device *l;
1923 struct mdt_device *m;
1929 l = &m->mdt_md_dev.md_lu_dev;
1930 result = mdt_init0(m, t, cfg);
1933 m = ERR_PTR(result);
1936 l = ERR_PTR(-ENOMEM);
1940 void mdt_device_free(struct lu_device *m)
1946 int mdt_type_init(struct lu_device_type *t)
1951 void mdt_type_fini(struct lu_device_type *t)
1955 static struct lu_device_type_operations mdt_device_type_ops = {
1956 .ldto_init = mdt_type_init,
1957 .ldto_fini = mdt_type_fini,
1959 .ldto_device_alloc = mdt_device_alloc,
1960 .ldto_device_free = mdt_device_free
1963 static struct lu_device_type mdt_device_type = {
1964 .ldt_name = LUSTRE_MDT0_NAME,
1965 .ldt_ops = &mdt_device_type_ops
1968 static int __init mdt_mod_init(void)
1970 struct lprocfs_static_vars lvars;
1971 struct obd_type *type;
1974 mdt_num_threads = MDT_NUM_THREADS;
1975 lprocfs_init_vars(mdt, &lvars);
1976 result = class_register_type(&mdt_obd_device_ops,
1977 lvars.module_vars, LUSTRE_MDT0_NAME);
1979 type = class_get_type(LUSTRE_MDT0_NAME);
1980 LASSERT(type != NULL);
1981 type->typ_lu = &mdt_device_type;
1982 result = type->typ_lu->ldt_ops->ldto_init(type->typ_lu);
1984 class_unregister_type(LUSTRE_MDT0_NAME);
1989 static void __exit mdt_mod_exit(void)
1991 class_unregister_type(LUSTRE_MDT0_NAME);
1994 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1995 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1996 MODULE_LICENSE("GPL");
1998 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1999 "number of mdt service threads to start");
2001 cfs_module(mdt, "0.0.2", mdt_mod_init, mdt_mod_exit);