2 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * Lustre Metadata Target (mdt) request handler
8 * Copyright (c) 2006 Cluster File Systems, Inc.
9 * Author: Peter Braam <braam@clusterfs.com>
10 * Author: Andreas Dilger <adilger@clusterfs.com>
11 * Author: Phil Schwan <phil@clusterfs.com>
12 * Author: Mike Shaver <shaver@clusterfs.com>
13 * Author: Nikita Danilov <nikita@clusterfs.com>
15 * This file is part of the Lustre file system, http://www.lustre.org
16 * Lustre is a trademark of Cluster File Systems, Inc.
18 * You may have signed or agreed to another license before downloading
19 * this software. If so, you are bound by the terms and conditions
20 * of that agreement, and the following does not apply to you. See the
21 * LICENSE file included with this distribution for more information.
23 * If you did not agree to a different license, then this copy of Lustre
24 * is open source software; you can redistribute it and/or modify it
25 * under the terms of version 2 of the GNU General Public License as
26 * published by the Free Software Foundation.
28 * In either case, Lustre is distributed in the hope that it will be
29 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * license text for more details.
35 # define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_MDS
39 #include <linux/module.h>
40 #include <linux/lu_object.h>
46 static int mdt_connect_internal(struct obd_export *exp,
47 struct obd_connect_data *data)
49 struct obd_device *obd = exp->exp_obd;
51 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
52 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
54 /* If no known bits (which should not happen, probably,
55 as everybody should support LOOKUP and UPDATE bits at least)
56 revert to compat mode with plain locks. */
57 if (!data->ocd_ibits_known &&
58 data->ocd_connect_flags & OBD_CONNECT_IBITS)
59 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
61 if (!obd->u.mds.mdt_fl_acl)
62 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
64 if (!obd->u.mds.mdt_fl_user_xattr)
65 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
67 exp->exp_connect_flags = data->ocd_connect_flags;
68 data->ocd_version = LUSTRE_VERSION_CODE;
69 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
72 if (obd->u.mds.mdt_fl_acl &&
73 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
74 CWARN("%s: MDS requires ACL support but client does not\n",
81 static int mdt_reconnect(struct obd_export *exp, struct obd_device *obd,
82 struct obd_uuid *cluuid,
83 struct obd_connect_data *data)
88 if (exp == NULL || obd == NULL || cluuid == NULL)
91 rc = mdt_connect_internal(exp, data);
96 /* Establish a connection to the MDS.
98 * This will set up an export structure for the client to hold state data
99 * about that client, like open files, the last operation number it did
100 * on the server, etc.
102 static int mdt_connect(struct lustre_handle *conn, struct obd_device *obd,
103 struct obd_uuid *cluuid, struct obd_connect_data *data)
105 struct obd_export *exp;
106 struct mdt_export_data *med;
107 struct mdt_client_data *mcd = NULL;
108 int rc, abort_recovery;
111 if (!conn || !obd || !cluuid)
114 /* Check for aborted recovery. */
115 spin_lock_bh(&obd->obd_processing_task_lock);
116 abort_recovery = obd->obd_abort_recovery;
117 spin_unlock_bh(&obd->obd_processing_task_lock);
119 target_abort_recovery(obd);
121 /* XXX There is a small race between checking the list and adding a
122 * new connection for the same UUID, but the real threat (list
123 * corruption when multiple different clients connect) is solved.
125 * There is a second race between adding the export to the list,
126 * and filling in the client data below. Hence skipping the case
127 * of NULL mcd above. We should already be controlling multiple
128 * connects at the client, and we can't hold the spinlock over
129 * memory allocations without risk of deadlocking.
131 rc = class_connect(conn, obd, cluuid);
134 exp = class_conn2export(conn);
136 med = &exp->exp_mdt_data;
138 rc = mdt_connect_internal(exp, data);
144 GOTO(out, rc = -ENOMEM);
146 memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
149 rc = mdt_client_add(obd, &obd->u.mds, med, -1);
158 class_disconnect(exp);
160 class_export_put(exp);
166 int mdt_init_export(struct obd_export *exp)
168 struct mdt_export_data *med = &exp->exp_mdt_data;
170 INIT_LIST_HEAD(&med->med_open_head);
171 spin_lock_init(&med->med_open_lock);
172 exp->exp_connecting = 1;
176 static int mdt_destroy_export(struct obd_export *export)
178 struct mdt_export_data *med;
179 struct obd_device *obd = export->exp_obd;
180 struct lvfs_run_ctxt saved;
184 med = &export->exp_mdt_data;
185 target_destroy_export(export);
187 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
190 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
191 /* Close any open files (which may also cause orphan unlinking). */
192 spin_lock(&med->med_open_lock);
193 while (!list_empty(&med->med_open_head)) {
194 struct list_head *tmp = med->med_open_head.next;
195 struct mdt_file_data *mfd =
196 list_entry(tmp, struct mdt_file_data, mfd_list);
197 struct dentry *dentry = mfd->mfd_dentry;
199 /* Remove mfd handle so it can't be found again.
200 * We are consuming the mfd_list reference here. */
201 mdt_mfd_unlink(mfd, 0);
202 spin_unlock(&med->med_open_lock);
204 /* If you change this message, be sure to update
205 * replay_single:test_46 */
206 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
207 "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len,
208 dentry->d_name.name, dentry->d_inode->i_ino);
209 /* child orphan sem protects orphan_dec_test and
210 * is_orphan race, mdt_mfd_close drops it */
211 MDT_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
212 rc = mdt_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd,
213 !(export->exp_flags & OBD_OPT_FAILOVER));
216 CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
217 spin_lock(&med->med_open_lock);
219 spin_unlock(&med->med_open_lock);
220 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
221 mdt_client_free(export);
226 static int mdt_disconnect(struct obd_export *exp)
228 unsigned long irqflags;
233 class_export_get(exp);
235 /* Disconnect early so that clients can't keep using export */
236 rc = class_disconnect(exp);
237 ldlm_cancel_locks_for_export(exp);
239 /* complete all outstanding replies */
240 spin_lock_irqsave(&exp->exp_lock, irqflags);
241 while (!list_empty(&exp->exp_outstanding_replies)) {
242 struct ptlrpc_reply_state *rs =
243 list_entry(exp->exp_outstanding_replies.next,
244 struct ptlrpc_reply_state, rs_exp_list);
245 struct ptlrpc_service *svc = rs->rs_service;
247 spin_lock(&svc->srv_lock);
248 list_del_init(&rs->rs_exp_list);
249 ptlrpc_schedule_difficult_reply(rs);
250 spin_unlock(&svc->srv_lock);
252 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
254 class_export_put(exp);
258 static int mdt_getstatus(struct mdt_thread_info *info,
259 struct ptlrpc_request *req)
261 struct md_device *mdd = info->mti_mdt->mdt_mdd;
262 int size = sizeof *body;
263 struct mds_body *body;
268 result = lustre_pack_reply(req, 1, &size, NULL);
270 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
272 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
275 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
276 result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
279 /* the last_committed and last_xid fields are filled in for all
280 * replies already - no need to do so here also.
285 static int mdt_getattr_internal(struct obd_device *obd, struct dentry *dentry,
286 struct ptlrpc_request *req,
287 struct mds_body *reqbody, int reply_off)
289 struct mds_body *body;
290 struct inode *inode = dentry->d_inode;
297 body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
298 LASSERT(body != NULL); /* caller prepped reply */
300 mdt_pack_inode2fid(&body->fid1, inode);
301 mdt_pack_inode2body(body, inode);
304 if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
305 (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
306 rc = mdt_pack_md(obd, req->rq_repmsg, reply_off, body,
309 /* If we have LOV EA data, the OST holds size, atime, mtime */
310 if (!(body->valid & OBD_MD_FLEASIZE) &&
311 !(body->valid & OBD_MD_FLDIREA))
312 body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
313 OBD_MD_FLATIME | OBD_MD_FLMTIME);
315 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
316 if (body->eadatasize)
318 } else if (S_ISLNK(inode->i_mode) &&
319 (reqbody->valid & OBD_MD_LINKNAME) != 0) {
320 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
323 LASSERT (symname != NULL); /* caller prepped reply */
324 len = req->rq_repmsg->buflens[reply_off];
326 rc = inode->i_op->readlink(dentry, symname, len);
328 CERROR("readlink failed: %d\n", rc);
329 } else if (rc != len - 1) {
330 CERROR ("Unexpected readlink rc %d: expecting %d\n",
334 CDEBUG(D_INODE, "read symlink dest %s\n", symname);
335 body->valid |= OBD_MD_LINKNAME;
336 body->eadatasize = rc + 1;
337 symname[rc] = 0; /* NULL terminate */
343 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
344 struct mdt_obd *mds = mdt_req2mds(req);
345 body->max_cookiesize = mds->mdt_max_cookiesize;
346 body->max_mdsize = mds->mdt_max_mdsize;
347 body->valid |= OBD_MD_FLMODEASIZE;
356 static int mdt_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
359 struct mdt_obd *mds = mdt_req2mds(req);
360 struct mds_body *body;
361 int rc, size[2] = {sizeof(*body)}, bufcount = 1;
364 body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
365 LASSERT(body != NULL); /* checked by caller */
366 LASSERT_REQSWABBED(req, offset); /* swabbed by caller */
368 if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
369 (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
370 LOCK_INODE_MUTEX(inode);
371 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
373 UNLOCK_INODE_MUTEX(inode);
374 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
377 if (rc != -ENODATA) {
378 CERROR("error getting inode %lu MD: rc = %d\n",
383 } else if (rc > mds->mdt_max_mdsize) {
385 CERROR("MD size %d larger than maximum possible %u\n",
386 rc, mds->mdt_max_mdsize);
391 } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
392 if (inode->i_size + 1 != body->eadatasize)
393 CERROR("symlink size: %Lu, reply space: %d\n",
394 inode->i_size + 1, body->eadatasize);
395 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
397 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
398 inode->i_size + 1, body->eadatasize);
401 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
402 CERROR("failed MDT_GETATTR_PACK test\n");
403 req->rq_status = -ENOMEM;
407 rc = lustre_pack_reply(req, bufcount, size, NULL);
409 CERROR("lustre_pack_reply failed: rc %d\n", rc);
417 static int mdt_getattr_name(int offset, struct ptlrpc_request *req,
418 int child_part, struct lustre_handle *child_lockh)
420 struct obd_device *obd = req->rq_export->exp_obd;
421 struct mdt_obd *mds = &obd->u.mds;
422 struct ldlm_reply *rep = NULL;
423 struct lvfs_run_ctxt saved;
424 struct mds_body *body;
425 struct dentry *dparent = NULL, *dchild = NULL;
426 struct lvfs_ucred uc = {NULL,};
427 struct lustre_handle parent_lockh;
429 int rc = 0, cleanup_phase = 0, resent_req = 0;
433 LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME));
435 /* Swab now, before anyone looks inside the request */
437 body = lustre_swab_reqbuf(req, offset, sizeof(*body),
438 lustre_swab_mdt_body);
440 CERROR("Can't swab mdt_body\n");
444 LASSERT_REQSWAB(req, offset + 1);
445 name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
447 CERROR("Can't unpack name\n");
450 namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
452 rc = mdt_init_ucred(&uc, req, offset);
456 LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF);
457 /* if requests were at offset 2, the getattr reply goes back at 1 */
458 if (offset == MDS_REQ_INTENT_REC_OFF) {
459 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
463 push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
464 cleanup_phase = 1; /* kernel context */
465 intent_set_disposition(rep, DISP_LOOKUP_EXECD);
467 if (lustre_handle_is_used(child_lockh)) {
468 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
472 if (resent_req == 0) {
474 rc = mdt_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
475 &parent_lockh, &dparent,
477 MDS_INODELOCK_UPDATE,
479 child_lockh, &dchild, LCK_CR,
482 /* For revalidate by fid we always take UPDATE lock */
483 dchild = mdt_fid2locked_dentry(obd, &body->fid2, NULL,
486 MDT_INODELOCK_UPDATE);
489 rc = PTR_ERR(dchild);
494 struct ldlm_lock *granted_lock;
495 struct ll_fid child_fid;
496 struct ldlm_resource *res;
497 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
498 granted_lock = ldlm_handle2lock(child_lockh);
499 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
500 body->fid1.id, body->fid1.generation,
501 child_lockh->cookie);
504 res = granted_lock->l_resource;
505 child_fid.id = res->lr_name.name[0];
506 child_fid.generation = res->lr_name.name[1];
507 dchild = mdt_fid2dentry(&obd->u.mds, &child_fid, NULL);
508 LASSERT(!IS_ERR(dchild));
509 LDLM_LOCK_PUT(granted_lock);
512 cleanup_phase = 2; /* dchild, dparent, locks */
514 if (dchild->d_inode == NULL) {
515 intent_set_disposition(rep, DISP_LOOKUP_NEG);
516 /* in the intent case, the policy clears this error:
517 the disposition is enough */
518 GOTO(cleanup, rc = -ENOENT);
520 intent_set_disposition(rep, DISP_LOOKUP_POS);
523 if (req->rq_repmsg == NULL) {
524 rc = mdt_getattr_pack_msg(req, dchild->d_inode, offset);
526 CERROR ("mdt_getattr_pack_msg: %d\n", rc);
531 rc = mdt_getattr_internal(obd, dchild, req, body, offset);
532 GOTO(cleanup, rc); /* returns the lock to the client */
535 switch (cleanup_phase) {
537 if (resent_req == 0) {
538 if (rc && dchild->d_inode)
539 ldlm_lock_decref(child_lockh, LCK_CR);
540 ldlm_lock_decref(&parent_lockh, LCK_CR);
545 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
547 mds_exit_ucred(&uc, mds);
548 if (req->rq_reply_state == NULL) {
550 lustre_pack_reply(req, 0, NULL, NULL);
556 static int mds_getattr(struct ptlrpc_request *req, int offset)
558 struct mds_obd *mds = mds_req2mds(req);
559 struct obd_device *obd = req->rq_export->exp_obd;
560 struct lvfs_run_ctxt saved;
562 struct mds_body *body;
563 struct lvfs_ucred uc = {NULL,};
567 body = lustre_swab_reqbuf(req, offset, sizeof(*body),
568 lustre_swab_mds_body);
572 rc = mds_init_ucred(&uc, req, offset);
576 push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
577 de = mds_fid2dentry(mds, &body->fid1, NULL);
579 rc = req->rq_status = PTR_ERR(de);
583 rc = mds_getattr_pack_msg(req, de->d_inode, offset);
585 CERROR("mds_getattr_pack_msg: %d\n", rc);
589 req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
594 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
596 if (req->rq_reply_state == NULL) {
598 lustre_pack_reply(req, 0, NULL, NULL);
600 mds_exit_ucred(&uc, mds);
605 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
606 unsigned long max_age)
610 spin_lock(&obd->obd_osfs_lock);
611 rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
613 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
614 spin_unlock(&obd->obd_osfs_lock);
619 static int mds_statfs(struct ptlrpc_request *req)
621 struct obd_device *obd = req->rq_export->exp_obd;
622 int rc, size = sizeof(struct obd_statfs);
625 /* This will trigger a watchdog timeout */
626 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
627 (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
629 rc = lustre_pack_reply(req, 1, &size, NULL);
630 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
631 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
635 /* We call this so that we can cache a bit - 1 jiffie worth */
636 rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
639 CERROR("mds_obd_statfs failed: rc %d\n", rc);
649 static int mds_set_info(struct obd_export *exp, struct ptlrpc_request *req)
656 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
658 DEBUG_REQ(D_HA, req, "no set_info key");
661 keylen = req->rq_reqmsg->buflens[0];
663 val = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*val));
665 DEBUG_REQ(D_HA, req, "no set_info val");
669 rc = lustre_pack_reply(req, 0, NULL, NULL);
672 req->rq_repmsg->status = 0;
674 if (keylen < strlen("read-only") ||
675 memcmp(key, "read-only", keylen) != 0)
679 exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
681 exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
686 enum mdt_handler_flags {
688 * struct mds_body is passed in the 0-th incoming buffer.
690 HABEO_CORPUS = (1 << 0)
698 int (*mh_act)(struct mdt_thread_info *info, struct ptlrpc_request *req);
701 #define DEF_HNDL(prefix, base, flags, name, fn) \
702 [prefix ## name - prefix ## base] = { \
704 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \
705 .mh_opc = prefix ## _ ## opc, \
710 #define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(mdt, CONNECT, flags, name, fn)
712 static struct mdt_handler mdt_mds_ops[] = {
713 DEF_MDT_HNDL(0, CONNECT, mdt_connect),
714 DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect),
715 DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus),
716 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr),
717 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name),
718 DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr),
719 DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr),
720 DEF_MDT_HNDL(0, STATFS, mdt_statfs),
721 DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage),
722 DEF_MDT_HNDL(0, REINT, mdt_reint),
723 DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close),
724 DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
725 DEF_MDT_HNDL(0, PIN, mdt_pin),
726 DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
727 DEF_MDT_HNDL(0, 0 /*SET_INFO*/, mdt_set_info),
728 DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
729 DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
732 static struct mdt_handler mdt_obd_ops[] = {
735 static struct mdt_handler mdt_dlm_ops[] = {
738 static struct mdt_handler mdt_llog_ops[] = {
741 static struct mdt_opc_slice {
744 struct mdt_handler *mos_hs;
747 .mos_opc_start = MDS_GETATTR,
748 .mos_opc_end = MDS_LAST_OPC,
749 .mos_hs = mdt_mds_ops
752 .mos_opc_start = OBD_PING,
753 .mos_opc_end = OBD_LAST_OPC,
754 .mos_hs = mdt_obd_ops
757 .mos_opc_start = LDLM_ENQUEUE,
758 .mos_opc_end = LDLM_LAST_OPC,
759 .mos_hs = mdt_dlm_ops
762 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
763 .mos_opc_end = LLOG_LAST_OPC,
764 .mos_hs = mdt_llog_ops
769 MDT_REP_BUF_NR_MAX = 8
773 * Common data shared by mdt-level handlers. This is allocated per-thread to
774 * reduce stack consumption.
776 struct mdt_thread_info {
777 struct mdt_device *mti_mdt;
779 * number of buffers in reply message.
783 * sizes of reply buffers.
785 int mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
787 * Body for "habeo corpus" operations.
789 struct mds_body *mti_body;
791 * Host object. This is released at the end of mdt_handler().
793 struct mdt_object *mti_object;
795 * Additional fail id that can be set by handler. Passed to
796 * target_send_reply().
800 * Offset of incoming buffers. 0 for top-level request processing. +ve
801 * for intent handling.
806 struct mdt_handler *mdt_handler_find(__u32 opc)
809 struct mdt_opc_slice *s;
810 struct mdt_handler *h;
813 for (i = 0, s = mdt_handlers; i < ARRAY_SIZE(mdt_handlers); i++, s++) {
814 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
815 h = s->mos_hs + (opc - s->mos_opc_start);
817 LASSERT(h->mos_opc == opc);
819 h = NULL; /* unsupported opc */
826 struct mdt_object *mdt_object_find(struct mdt_device *d, struct lfid *f)
830 o = lu_object_find(&d->mdt_lu_dev.ld_site, f);
832 return (struct mdd_object *)o;
834 return container_of(o, struct mdt_object, mot_obj.mo_lu);
837 void mdt_object_put(struct mdt_object *o)
839 lu_object_put(&o->mot_obj.mo_lu);
842 static int mdt_req_handle(struct mdt_thread_info *info,
843 struct mdt_handler *h, struct ptlrpc_request *req,
850 LASSERT(h->mh_act != NULL);
851 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
853 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
855 if (h->mh_fail_id != 0)
856 OBD_FAIL_RETURN(h->mh_fail_id, 0);
858 h->mh_offset = MDS_REQ_REC_OFF + shift;
859 if (h->mh_flags & HABEO_CORPUS) {
860 info->mti_body = lustre_swab_reqbuf(req, h->mh_offset,
861 sizeof *info->mti_body,
862 lustre_swab_mds_body);
863 if (info->mti_body == NULL) {
864 CERROR("Can't unpack body\n");
865 result = req->rq_status = -EFAULT;
867 info->mti_object = mdt_object_find(info->mti_mdt,
868 info->mti_body.fid1);
869 if (IS_ERR(info->mti_object))
870 result = PTR_ERR(info->mti_object);
873 result = h->mh_act(info, h, req);
875 * XXX result value is unconditionally shoved into ->rq_status
876 * (original code sometimes placed error code into ->rq_status, and
877 * sometimes returned it to the
878 * caller). ptlrpc_server_handle_request() doesn't check return value
881 req->rq_status = result;
885 static void mdt_thread_info_init(struct mdt_thread_info *info)
887 memset(info, 0, sizeof *info);
888 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
892 for (info->mti_rep_buf_nr = 0;
893 info->mti_rep_buf_nr < MDT_REP_BUF_NR_MAX; info->mti_rep_buf_nr++)
894 info->mti_rep_buf_size[info->mti_rep_buf_nr] = ~0;
897 static void mdt_thread_info_fini(struct mdt_thread_info *info)
899 if (info->mti_object != NULL) {
900 mdt_object_put(info->mti_object);
901 info->mti_object = NULL;
905 int mdt_handle(struct ptlrpc_request *req)
909 struct mds_obd *mds = NULL; /* quell gcc overwarning */
910 struct obd_device *obd = NULL;
911 struct mdt_thread_info info; /* XXX on stack for now */
912 struct mdt_handler *h;
916 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
918 LASSERT(current->journal_info == NULL);
920 rc = mds_msg_check_version(req->rq_reqmsg);
922 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
926 /* XXX identical to OST */
927 if (req->rq_reqmsg->opc != MDS_CONNECT) {
928 struct mds_export_data *med;
929 int recovering, abort_recovery;
931 if (req->rq_export == NULL) {
932 CERROR("operation %d on unconnected MDS from %s\n",
934 libcfs_id2str(req->rq_peer));
935 req->rq_status = -ENOTCONN;
936 GOTO(out, rc = -ENOTCONN);
939 med = &req->rq_export->exp_mds_data;
940 obd = req->rq_export->exp_obd;
943 /* sanity check: if the xid matches, the request must
944 * be marked as a resent or replayed */
945 if (req->rq_xid == med->med_mcd->mcd_last_xid)
946 LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
947 (MSG_RESENT | MSG_REPLAY),
948 "rq_xid "LPU64" matches last_xid, "
949 "expected RESENT flag\n",
951 /* else: note the opposite is not always true; a
952 * RESENT req after a failover will usually not match
953 * the last_xid, since it was likely never
954 * committed. A REPLAYed request will almost never
955 * match the last xid, however it could for a
956 * committed, but still retained, open. */
958 /* Check for aborted recovery. */
959 spin_lock_bh(&obd->obd_processing_task_lock);
960 abort_recovery = obd->obd_abort_recovery;
961 recovering = obd->obd_recovering;
962 spin_unlock_bh(&obd->obd_processing_task_lock);
963 if (abort_recovery) {
964 target_abort_recovery(obd);
965 } else if (recovering) {
966 rc = mds_filter_recovery_request(req, obd,
968 if (rc || !should_process)
973 h = mdt_handler_find(req->rq_reqmsg->opc);
975 rc = mdt_handle_req(&info, h, req, 0);
977 req->rq_status = -ENOTSUPP;
978 rc = ptlrpc_error(req);
982 LASSERT(current->journal_info == NULL);
984 /* If we're DISCONNECTing, the mds_export_data is already freed */
985 if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
986 struct mds_export_data *med = &req->rq_export->exp_mds_data;
987 req->rq_repmsg->last_xid =
988 le64_to_cpu(med->med_mcd->mcd_last_xid);
990 target_committed_to_req(req);
996 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
997 if (obd && obd->obd_recovering) {
998 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
999 return target_queue_final_reply(req, rc);
1001 /* Lost a race with recovery; let the error path DTRT. */
1002 rc = req->rq_status = -ENOTCONN;
1005 target_send_reply(req, rc, info.mti_fail_id);
1009 static int mdt_intent_policy(struct ldlm_namespace *ns,
1010 struct ldlm_lock **lockp, void *req_cookie,
1011 ldlm_mode_t mode, int flags, void *data)
1013 RETURN(ELDLM_LOCK_ABORTED);
1016 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
1017 svc_handler_t h, char *name,
1018 struct proc_dir_entry *proc_entry,
1019 svcreq_printfn_t prntfn)
1021 return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
1022 c->psc_max_req_size, c->psc_max_reply_size,
1023 c->psc_req_portal, c->psc_rep_portal,
1024 c->psc_watchdog_timeout,
1025 h, char name, proc_entry,
1026 prntfn, c->psc_num_threads);
1029 int md_device_init(struct md_device *md)
1031 return lu_device_init(&md->md_lu_dev);
1034 void md_device_fini(struct md_device *md)
1036 lu_device_fini(&md->md_lu_dev);
1039 static struct lu_device_operations mdt_lu_ops;
1041 static int mdt_device_init(struct mdt_device *m)
1043 md_device_init(&m->mdt_md_dev);
1045 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1047 m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
1048 m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
1049 m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
1050 m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
1051 m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
1052 m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
1053 m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1055 * We'd like to have a mechanism to set this on a per-device basis,
1058 if (mds_num_threads < 2)
1059 mds_num_threads = MDS_DEF_THREADS;
1060 m->mdt_service_conf.psc_num_threads = min(mds_num_threads,
1065 static void mdt_device_fini(struct mdt_device *m)
1067 md_device_fini(&m->mdt_md_dev);
1070 static int lu_device_is_mdt(struct lu_device *d)
1073 * XXX for now. Tags in lu_device_type->ldt_something are needed.
1075 return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
1078 static struct mdt_device *mdt_dev(struct lu_device *d)
1080 LASSERT(lu_device_is_mdt(d));
1081 return container_of(d, struct mdt_device, mdt_lu_dev);
1084 static struct mdt_object *mdt_obj(struct lu_object *o)
1086 LASSERT(lu_device_is_mdt(o->lo_dev));
1087 return container_of(o, struct mdt_object, mot_obj.mo_lu);
1090 static void mdt_fini(struct lu_device *d)
1092 struct mdt_device *m = mdt_dev(d);
1094 if (d->ld_site != NULL) {
1095 lu_site_fini(d->ld_site);
1098 if (m->mdt_service != NULL) {
1099 ptlrpc_unregister_service(m->mdt_service);
1100 m->mdt_service = NULL;
1102 if (m->mdt_namespace != NULL) {
1103 ldlm_namespace_free(m->mdt_namespace, 0);
1104 m->mdt_namespace = NULL;
1107 LASSERT(atomic_read(&d->ld_ref) == 0);
1110 static int mdt_init0(struct lu_device *d)
1112 struct mdt_device *m = mdt_dev(d);
1125 snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1126 m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1127 if (m->mdt_namespace == NULL)
1129 ldlm_register_intent(m->mst_namespace, mdt_intent_policy);
1131 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1132 "mdt_ldlm_client", &m->mdt_ldlm_client);
1134 m->mdt_service = ptlrpc_init_svc_conf(&mdt->mdt_service_conf,
1135 mdt_handle, LUSTRE_MDT0_NAME,
1136 mdt->mdt_lu_dev.ld_proc_entry
1138 if (m->mdt_service == NULL)
1141 return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1144 static int mdt_init(struct lu_device *d)
1148 result = mdt_init0(d);
1154 struct lu_object *mdt_object_alloc(struct lu_device *d)
1156 struct mdt_object *mo;
1160 struct lu_object *o;
1161 struct lu_object_header *h;
1163 o = &mo->mot_obj.mo_lu;
1164 h = &mo->mot_header;
1165 lu_object_header_init(h);
1166 lu_object_init(o, h, d);
1167 /* ->lo_depth and ->lo_flags are automatically 0 */
1168 lu_object_add_top(h, o);
1173 int mdt_object_init(struct lu_object *o)
1175 struct mdt_device *d = mdt_dev(o->lo_dev);
1176 struct lu_device *under;
1177 struct lu_object *below;
1179 under = &d->mdt_mdd->md_lu_dev;
1180 below = under->ld_ops->ldo_alloc(under);
1181 if (below != NULL) {
1182 lu_object_add(o, below);
1188 void mdt_object_free(struct lu_object *o)
1190 struct lu_object_header;
1194 lu_object_header_fini(h);
1197 void mdt_object_release(struct lu_object *o)
1201 int mdt_object_print(struct seq_file *f, const struct lu_object *o)
1203 return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1206 static struct lu_device_operations mdt_lu_ops = {
1207 .ldo_init = mdt_init,
1208 .ldo_fini = mdt_fini,
1209 .ldo_object_alloc = mdt_object_alloc,
1210 .ldo_object_init = mdt_object_init,
1211 .ldo_object_free = mdt_object_free,
1212 .ldo_object_release = mdt_object_release,
1213 .ldo_object_print = mdt_object_print
1216 int mdt_mkdir(struct mdt_device *d, struct lfid *pfid, const char *name)
1218 struct mdt_object *o;
1219 struct lock_handle lh;
1222 o = mdt_object_find(d, pfid);
1225 result = fid_lock(pfid, LCK_PW, &lh);
1227 result = d->mdt_dev.md_ops->mdo_mkdir(o, name);
1234 static struct obd_ops mdt_ops = {
1235 .o_owner = THIS_MODULE,
1236 .o_connect = mds_connect,
1237 .o_reconnect = mds_reconnect,
1238 .o_init_export = mds_init_export,
1239 .o_destroy_export = mds_destroy_export,
1240 .o_disconnect = mds_disconnect,
1241 .o_setup = mds_setup,
1242 .o_precleanup = mds_precleanup,
1243 .o_cleanup = mds_cleanup,
1244 .o_postrecov = mds_postrecov,
1245 .o_statfs = mds_obd_statfs,
1246 .o_iocontrol = mds_iocontrol,
1247 .o_create = mds_obd_create,
1248 .o_destroy = mds_obd_destroy,
1249 .o_llog_init = mds_llog_init,
1250 .o_llog_finish = mds_llog_finish,
1251 .o_notify = mds_notify,
1252 .o_health_check = mds_health_check,
1255 static int __init mdt_mod_init(void)
1260 static void __exit mdt_mod_exit(void)
1264 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1265 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1266 MODULE_LICENSE("GPL");
1268 CFS_MODULE_PARM(mdt_num_threads, "i", int, 0444,
1269 "number of mdt service threads to start");
1271 cfs_module(mdt, "0.0.2", mdt_mod_init, mdt_mod_exit);