1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
14 * This file is part of the Lustre file system, http://www.lustre.org
15 * Lustre is a trademark of Cluster File Systems, Inc.
17 * You may have signed or agreed to another license before downloading
18 * this software. If so, you are bound by the terms and conditions
19 * of that agreement, and the following does not apply to you. See the
20 * LICENSE file included with this distribution for more information.
22 * If you did not agree to a different license, then this copy of Lustre
23 * is open source software; you can redistribute it and/or modify it
24 * under the terms of version 2 of the GNU General Public License as
25 * published by the Free Software Foundation.
27 * In either case, Lustre is distributed in the hope that it will be
28 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * license text for more details.
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <linux/module.h>
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
43 * struct OBD_{ALLOC,FREE}*()
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
54 /* struct mds_client_data */
55 #include "../mds/mds_internal.h"
56 #include "mdt_internal.h"
59 * Initialized in mdt_mod_init().
61 unsigned long mdt_num_threads;
63 static int mdt_handle (struct ptlrpc_request *req);
64 static struct mdt_device *mdt_dev (struct lu_device *d);
65 static struct lu_fid *mdt_object_fid(struct mdt_object *o);
67 static struct lu_context_key mdt_thread_key;
69 /* object operations */
70 static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
71 struct lu_fid *pfid, const char *name,
75 struct mdt_object *child;
76 struct mdt_lock_handle *lh;
80 lh = &info->mti_lh[MDT_LH_PARENT];
81 lh->mlh_mode = LCK_PW;
83 o = mdt_object_find_lock(info->mti_ctxt,
84 d, pfid, lh, MDS_INODELOCK_UPDATE);
88 child = mdt_object_find(info->mti_ctxt, d, cfid);
90 struct md_object *next = mdt_object_child(o);
92 result = next->mo_ops->moo_mkdir(info->mti_ctxt, next, name,
93 mdt_object_child(child));
94 mdt_object_put(info->mti_ctxt, child);
96 result = PTR_ERR(child);
97 mdt_object_unlock(d->mdt_namespace, o, lh);
98 mdt_object_put(info->mti_ctxt, o);
102 static int mdt_getstatus(struct mdt_thread_info *info,
103 struct ptlrpc_request *req, int offset)
105 struct md_device *next = info->mti_mdt->mdt_child;
106 struct mdt_body *body;
107 int size = sizeof *body;
112 result = lustre_pack_reply(req, 1, &size, NULL);
114 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
116 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
119 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
120 result = next->md_ops->mdo_root_get(info->mti_ctxt,
124 /* the last_committed and last_xid fields are filled in for all
125 * replies already - no need to do so here also.
130 static int mdt_statfs(struct mdt_thread_info *info,
131 struct ptlrpc_request *req, int offset)
133 struct md_device *next = info->mti_mdt->mdt_child;
134 struct obd_statfs *osfs;
137 int size = sizeof(struct obd_statfs);
141 result = lustre_pack_reply(req, 1, &size, NULL);
143 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
145 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
146 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
149 osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
150 /* XXX max_age optimisation is needed here. See mds_statfs */
151 result = next->md_ops->mdo_statfs(info->mti_ctxt, next, &sfs);
152 statfs_pack(osfs, &sfs);
158 static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
160 b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
161 OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
162 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
164 if (!S_ISREG(attr->la_mode))
165 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
168 b->atime = attr->la_atime;
169 b->mtime = attr->la_mtime;
170 b->ctime = attr->la_ctime;
171 b->mode = attr->la_mode;
172 b->size = attr->la_size;
173 b->blocks = attr->la_blocks;
174 b->uid = attr->la_uid;
175 b->gid = attr->la_gid;
176 b->flags = attr->la_flags;
177 b->nlink = attr->la_nlink;
180 static int mdt_getattr(struct mdt_thread_info *info,
181 struct ptlrpc_request *req, int offset)
183 struct mdt_body *body;
184 int size = sizeof (*body);
187 LASSERT(info->mti_object != NULL);
191 result = lustre_pack_reply(req, 1, &size, NULL);
193 CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
195 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
196 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
199 struct md_object *next = mdt_object_child(info->mti_object);
201 result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
202 &info->mti_ctxt->lc_attr);
204 body = lustre_msg_buf(req->rq_repmsg, 0, size);
205 mdt_pack_attr2body(body, &info->mti_ctxt->lc_attr);
206 body->fid1 = *mdt_object_fid(info->mti_object);
212 static struct lu_device_operations mdt_lu_ops;
214 static int lu_device_is_mdt(struct lu_device *d)
217 * XXX for now. Tags in lu_device_type->ldt_something are needed.
219 return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
222 static struct mdt_device *mdt_dev(struct lu_device *d)
224 LASSERT(lu_device_is_mdt(d));
225 return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
228 static int mdt_connect(struct mdt_thread_info *info,
229 struct ptlrpc_request *req, int offset)
233 result = target_handle_connect(req, mdt_handle);
235 struct obd_connect_data *data;
237 LASSERT(req->rq_export != NULL);
238 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
240 data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
241 result = seq_mgr_alloc(info->mti_ctxt,
242 info->mti_mdt->mdt_seq_mgr,
248 static int mdt_disconnect(struct mdt_thread_info *info,
249 struct ptlrpc_request *req, int offset)
251 //return -EOPNOTSUPP;
252 return target_handle_disconnect(req);
255 static int mdt_getattr_name(struct mdt_thread_info *info,
256 struct ptlrpc_request *req, int offset)
261 static int mdt_setxattr(struct mdt_thread_info *info,
262 struct ptlrpc_request *req, int offset)
267 static int mdt_getxattr(struct mdt_thread_info *info,
268 struct ptlrpc_request *req, int offset)
273 static int mdt_readpage(struct mdt_thread_info *info,
274 struct ptlrpc_request *req, int offset)
279 static int mdt_reint(struct mdt_thread_info *info,
280 struct ptlrpc_request *req, int offset)
285 static int mdt_close(struct mdt_thread_info *info,
286 struct ptlrpc_request *req, int offset)
291 static int mdt_done_writing(struct mdt_thread_info *info,
292 struct ptlrpc_request *req, int offset)
297 static int mdt_pin(struct mdt_thread_info *info,
298 struct ptlrpc_request *req, int offset)
303 static int mdt_sync(struct mdt_thread_info *info,
304 struct ptlrpc_request *req, int offset)
309 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
310 struct ptlrpc_request *req, int offset)
315 static int mdt_handle_quotactl(struct mdt_thread_info *info,
316 struct ptlrpc_request *req, int offset)
325 static struct ldlm_callback_suite cbs = {
326 .lcs_completion = ldlm_server_completion_ast,
327 .lcs_blocking = ldlm_server_blocking_ast,
331 static int mdt_enqueue(struct mdt_thread_info *info,
332 struct ptlrpc_request *req, int offset)
335 * info->mti_dlm_req already contains swapped and (if necessary)
336 * converted dlm request.
338 LASSERT(info->mti_dlm_req != NULL);
340 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
341 return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
344 static int mdt_convert(struct mdt_thread_info *info,
345 struct ptlrpc_request *req, int offset)
347 LASSERT(info->mti_dlm_req);
348 return ldlm_handle_convert0(req, info->mti_dlm_req);
351 static int mdt_bl_callback(struct mdt_thread_info *info,
352 struct ptlrpc_request *req, int offset)
354 CERROR("bl callbacks should not happen on MDS\n");
359 static int mdt_cp_callback(struct mdt_thread_info *info,
360 struct ptlrpc_request *req, int offset)
362 CERROR("cp callbacks should not happen on MDS\n");
368 * Build (DLM) resource name from fid.
370 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
371 struct ldlm_res_id *name)
373 memset(name, 0, sizeof *name);
374 /* we use fid_num() whoch includes also object version instread of raw
376 name->name[0] = fid_seq(f);
377 name->name[1] = fid_num(f);
382 * Return true if resource is for object identified by fid.
384 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
386 return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
389 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
390 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
391 struct lustre_handle *lh, ldlm_mode_t mode,
392 ldlm_policy_data_t *policy)
394 struct ldlm_res_id res_id;
402 /* FIXME: is that correct to have @flags=0 here? */
403 rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
404 LDLM_IBITS, policy, mode, &flags,
405 ldlm_blocking_ast, ldlm_completion_ast, NULL,
406 NULL, NULL, 0, NULL, lh);
407 RETURN (rc == ELDLM_OK ? 0 : -EIO);
410 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
411 struct lustre_handle *lh, ldlm_mode_t mode)
413 struct ldlm_lock *lock;
416 /* FIXME: this is debug stuff, remove it later. */
417 lock = ldlm_handle2lock(lh);
419 CERROR("invalid lock handle "LPX64, lh->cookie);
423 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
425 ldlm_lock_decref(lh, mode);
429 static struct mdt_object *mdt_obj(struct lu_object *o)
431 LASSERT(lu_device_is_mdt(o->lo_dev));
432 return container_of(o, struct mdt_object, mot_obj.mo_lu);
435 struct mdt_object *mdt_object_find(struct lu_context *ctxt,
436 struct mdt_device *d,
441 o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
443 return (struct mdt_object *)o;
448 void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
450 lu_object_put(ctxt, &o->mot_obj.mo_lu);
453 static struct lu_fid *mdt_object_fid(struct mdt_object *o)
455 return lu_object_fid(&o->mot_obj.mo_lu);
458 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
459 struct mdt_lock_handle *lh, __u64 ibits)
461 ldlm_policy_data_t p = {
466 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
467 LASSERT(lh->mlh_mode != LCK_MINMODE);
469 return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
472 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
473 struct mdt_lock_handle *lh)
475 if (lustre_handle_is_used(&lh->mlh_lh)) {
476 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
477 lh->mlh_lh.cookie = 0;
481 struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
482 struct mdt_device *d,
484 struct mdt_lock_handle *lh,
487 struct mdt_object *o;
489 o = mdt_object_find(ctxt, d, f);
493 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
495 mdt_object_put(ctxt, o);
507 int (*mh_act)(struct mdt_thread_info *info,
508 struct ptlrpc_request *req, int offset);
511 enum mdt_handler_flags {
513 * struct mdt_body is passed in the 0-th incoming buffer.
515 HABEO_CORPUS = (1 << 0),
517 * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
520 HABEO_CLAVIS = (1 << 1)
523 struct mdt_opc_slice {
526 struct mdt_handler *mos_hs;
529 static struct mdt_opc_slice mdt_handlers[];
531 static struct mdt_handler *mdt_handler_find(__u32 opc)
533 struct mdt_opc_slice *s;
534 struct mdt_handler *h;
537 for (s = mdt_handlers; s->mos_hs != NULL; s++) {
538 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
539 h = s->mos_hs + (opc - s->mos_opc_start);
541 LASSERT(h->mh_opc == opc);
543 h = NULL; /* unsupported opc */
550 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
552 return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
555 static int mdt_lock_resname_compat(struct mdt_device *m,
556 struct ldlm_request *req)
558 /* XXX something... later. */
562 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
564 /* XXX something... later. */
569 * Invoke handler for this request opc. Also do necessary preprocessing
570 * (according to handler ->mh_flags), and post-processing (setting of
571 * ->last_{xid,committed}).
573 static int mdt_req_handle(struct mdt_thread_info *info,
574 struct mdt_handler *h, struct ptlrpc_request *req,
582 LASSERT(h->mh_act != NULL);
583 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
584 LASSERT(current->journal_info == NULL);
586 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
588 if (h->mh_fail_id != 0)
589 OBD_FAIL_RETURN(h->mh_fail_id, 0);
591 off = MDS_REQ_REC_OFF + shift;
594 if (h->mh_flags & HABEO_CORPUS) {
595 struct mdt_body *body;
597 body = info->mti_body =
598 lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
599 lustre_swab_mdt_body);
601 info->mti_object = mdt_object_find(info->mti_ctxt,
604 if (IS_ERR(info->mti_object)) {
605 result = PTR_ERR(info->mti_object);
606 info->mti_object = NULL;
609 CERROR("Can't unpack body\n");
612 } else if (h->mh_flags & HABEO_CLAVIS) {
613 struct ldlm_request *dlm;
616 dlm = info->mti_dlm_req =
617 lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
619 lustre_swab_ldlm_request);
621 if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
622 result = mdt_lock_resname_compat(info->mti_mdt,
625 CERROR("Can't unpack dlm request\n");
633 result = h->mh_act(info, req, off);
635 * XXX result value is unconditionally shoved into ->rq_status
636 * (original code sometimes placed error code into ->rq_status, and
637 * sometimes returned it to the
638 * caller). ptlrpc_server_handle_request() doesn't check return value
641 req->rq_status = result;
643 LASSERT(current->journal_info == NULL);
645 if (h->mh_flags & HABEO_CLAVIS &&
646 info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
647 struct ldlm_reply *rep;
649 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
651 result = mdt_lock_reply_compat(info->mti_mdt, rep);
654 /* If we're DISCONNECTing, the mds_export_data is already freed */
655 if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
656 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
657 target_committed_to_req(req);
662 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
664 lh->mlh_lh.cookie = 0ull;
665 lh->mlh_mode = LCK_MINMODE;
668 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
670 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
673 static void mdt_thread_info_init(struct mdt_thread_info *info)
677 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
681 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
682 info->mti_rep_buf_size[i] = ~0;
683 info->mti_rep_buf_nr = i;
684 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
685 mdt_lock_handle_init(&info->mti_lh[i]);
686 lu_context_enter(info->mti_ctxt);
689 static void mdt_thread_info_fini(struct mdt_thread_info *info)
693 lu_context_exit(info->mti_ctxt);
694 if (info->mti_object != NULL) {
695 mdt_object_put(info->mti_ctxt, info->mti_object);
696 info->mti_object = NULL;
698 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
699 mdt_lock_handle_fini(&info->mti_lh[i]);
702 static int mds_msg_check_version(struct lustre_msg *msg)
706 /* TODO: enable the below check while really introducing msg version.
707 * it's disabled because it will break compatibility with b1_4.
715 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
717 CERROR("bad opc %u version %08x, expecting %08x\n",
718 msg->opc, msg->version, LUSTRE_OBD_VERSION);
722 case MDS_GETATTR_NAME:
727 case MDS_DONE_WRITING:
737 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
739 CERROR("bad opc %u version %08x, expecting %08x\n",
740 msg->opc, msg->version, LUSTRE_MDS_VERSION);
744 case LDLM_BL_CALLBACK:
745 case LDLM_CP_CALLBACK:
746 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
748 CERROR("bad opc %u version %08x, expecting %08x\n",
749 msg->opc, msg->version, LUSTRE_DLM_VERSION);
752 case LLOG_ORIGIN_HANDLE_CREATE:
753 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
754 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
755 case LLOG_ORIGIN_HANDLE_READ_HEADER:
756 case LLOG_ORIGIN_HANDLE_CLOSE:
758 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
760 CERROR("bad opc %u version %08x, expecting %08x\n",
761 msg->opc, msg->version, LUSTRE_LOG_VERSION);
764 CERROR("MDS unknown opcode %d\n", msg->opc);
770 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
771 struct obd_device *obd, int *process)
773 switch (req->rq_reqmsg->opc) {
774 case MDS_CONNECT: /* This will never get here, but for completeness. */
775 case OST_CONNECT: /* This will never get here, but for completeness. */
782 case MDS_SYNC: /* used in unmounting */
786 *process = target_queue_recovery_request(req, obd);
790 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
792 /* XXX what should we set rq_status to here? */
793 req->rq_status = -EAGAIN;
794 RETURN(ptlrpc_error(req));
799 * Handle recovery. Return:
800 * +1: continue request processing;
801 * -ve: abort immediately with the given error code;
802 * 0: send reply with error code in req->rq_status;
804 static int mdt_recovery(struct ptlrpc_request *req)
808 struct obd_device *obd;
812 if (req->rq_reqmsg->opc == MDS_CONNECT)
815 if (req->rq_export == NULL) {
816 CERROR("operation %d on unconnected MDS from %s\n",
818 libcfs_id2str(req->rq_peer));
819 req->rq_status = -ENOTCONN;
823 /* sanity check: if the xid matches, the request must be marked as a
824 * resent or replayed */
825 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
826 lustre_msg_get_flags(req->rq_reqmsg) &
827 (MSG_RESENT | MSG_REPLAY)),
828 "rq_xid "LPU64" matches last_xid, "
829 "expected RESENT flag\n", req->rq_xid);
831 /* else: note the opposite is not always true; a RESENT req after a
832 * failover will usually not match the last_xid, since it was likely
833 * never committed. A REPLAYed request will almost never match the
834 * last xid, however it could for a committed, but still retained,
837 obd = req->rq_export->exp_obd;
839 /* Check for aborted recovery... */
840 spin_lock_bh(&obd->obd_processing_task_lock);
841 abort_recovery = obd->obd_abort_recovery;
842 recovering = obd->obd_recovering;
843 spin_unlock_bh(&obd->obd_processing_task_lock);
844 if (abort_recovery) {
845 target_abort_recovery(obd);
846 } else if (recovering) {
850 rc = mdt_filter_recovery_request(req, obd, &should_process);
851 if (rc != 0 || !should_process) {
859 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
861 struct obd_device *obd;
863 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
864 if (req->rq_reqmsg->opc != OBD_PING)
865 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
867 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
868 if (obd && obd->obd_recovering) {
869 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
870 RETURN(target_queue_final_reply(req, req->rq_status));
872 /* Lost a race with recovery; let the error path
874 req->rq_status = -ENOTCONN;
877 target_send_reply(req, req->rq_status, info->mti_fail_id);
878 RETURN(req->rq_status);
881 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
883 struct mdt_handler *h;
884 struct lustre_msg *msg;
889 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
891 LASSERT(current->journal_info == NULL);
893 msg = req->rq_reqmsg;
894 result = mds_msg_check_version(msg);
896 result = mdt_recovery(req);
899 h = mdt_handler_find(msg->opc);
901 result = mdt_req_handle(info, h, req, 0);
903 req->rq_status = -ENOTSUPP;
904 result = ptlrpc_error(req);
909 result = mdt_reply(req, info);
912 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
916 static int mdt_handle(struct ptlrpc_request *req)
919 struct lu_context *ctx;
920 struct mdt_thread_info *info;
923 ctx = req->rq_svc_thread->t_ctx;
924 LASSERT(ctx != NULL);
925 LASSERT(ctx->lc_thread == req->rq_svc_thread);
927 info = lu_context_key_get(ctx, &mdt_thread_key);
928 LASSERT(info != NULL);
930 mdt_thread_info_init(info);
931 /* it can be NULL while CONNECT */
933 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
935 result = mdt_handle0(req, info);
936 mdt_thread_info_fini(info);
940 static int mdt_intent_policy(struct ldlm_namespace *ns,
941 struct ldlm_lock **lockp, void *req_cookie,
942 ldlm_mode_t mode, int flags, void *data)
945 RETURN(ELDLM_LOCK_ABORTED);
948 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
949 svc_handler_t h, char *name,
950 struct proc_dir_entry *proc_entry,
951 svcreq_printfn_t prntfn)
953 return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
954 c->psc_max_req_size, c->psc_max_reply_size,
955 c->psc_req_portal, c->psc_rep_portal,
956 c->psc_watchdog_timeout,
958 prntfn, c->psc_num_threads);
961 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
962 const char *name, void *buf, int size, int mode)
964 struct md_device *child = m->mdt_child;
966 RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
969 static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
972 struct mdt_device *m = opaque;
976 rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
982 static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
985 RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
988 static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
991 RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
994 struct lu_seq_mgr_ops seq_mgr_ops = {
995 .smo_read = mdt_seq_mgr_read,
996 .smo_write = mdt_seq_mgr_write
999 /* device init/fini methods */
1001 static int mdt_fld(struct mdt_thread_info *info,
1002 struct ptlrpc_request *req, int offset)
1004 struct lu_site *ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
1005 struct md_fld mf, *p, *reply;
1006 int size = sizeof(*reply);
1011 rc = lustre_pack_reply(req, 1, &size, NULL);
1015 opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
1016 p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
1019 rc = fld_handle(ls->ls_fld, *opt, &mf);
1023 reply = lustre_msg_buf(req->rq_repmsg, 0, size);
1028 struct dt_device *md2_bottom_dev(struct mdt_device *m)
1030 /*FIXME: get dt device here*/
1034 static int mdt_fld_init(struct mdt_device *m)
1036 struct dt_device *dt;
1041 dt = md2_bottom_dev(m);
1043 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1045 OBD_ALLOC_PTR(ls->ls_fld);
1050 rc = fld_server_init(ls->ls_fld, dt);
1055 static int mdt_fld_fini(struct mdt_device *m)
1057 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1060 if (ls && ls->ls_fld) {
1061 fld_server_fini(ls->ls_fld);
1062 OBD_FREE_PTR(ls->ls_fld);
1067 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1069 if (m->mdt_service != NULL) {
1070 ptlrpc_unregister_service(m->mdt_service);
1071 m->mdt_service = NULL;
1073 if (m->mdt_fld_service != NULL) {
1074 ptlrpc_unregister_service(m->mdt_fld_service);
1075 m->mdt_fld_service = NULL;
1079 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1084 m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
1085 m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
1086 m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
1087 m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
1088 m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
1089 m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
1090 m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1092 * We'd like to have a mechanism to set this on a per-device basis,
1095 m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1099 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1100 "mdt_ldlm_client", &m->mdt_ldlm_client);
1103 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1105 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1107 if (m->mdt_service == NULL)
1110 rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1112 GOTO(err_mdt_svc, rc);
1114 /*start mdt fld service */
1116 m->mdt_service_conf.psc_req_portal = MDS_FLD_PORTAL;
1118 m->mdt_fld_service =
1119 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1121 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1123 if (m->mdt_fld_service == NULL)
1126 rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, LUSTRE_FLD0_NAME);
1128 GOTO(err_fld_svc, rc);
1132 ptlrpc_unregister_service(m->mdt_fld_service);
1133 m->mdt_fld_service = NULL;
1135 ptlrpc_unregister_service(m->mdt_service);
1136 m->mdt_service = NULL;
1141 static void mdt_stack_fini(struct mdt_device *m)
1144 struct lu_device *d = md2lu_dev(m->mdt_child);
1145 /* goes through all stack */
1147 struct lu_device *n;
1148 struct obd_type *type;
1149 struct lu_device_type *ldt = d->ld_type;
1153 /* each fini() returns next device in stack of layers
1154 * so we can avoid the recursion */
1155 n = ldt->ldt_ops->ldto_device_fini(d);
1156 ldt->ldt_ops->ldto_device_free(d);
1158 type = ldt->obd_type;
1160 class_put_type(type);
1161 /* switch to the next device in the layer */
1167 static struct lu_device *mdt_layer_setup(const char *typename,
1168 struct lu_device *child,
1169 struct lustre_cfg *cfg)
1171 struct obd_type *type;
1172 struct lu_device_type *ldt;
1173 struct lu_device *d;
1177 type = class_get_type(typename);
1179 CERROR("Unknown type: '%s'\n", typename);
1180 GOTO(out, rc = -ENODEV);
1184 ldt->obd_type = type;
1186 CERROR("type: '%s'\n", typename);
1187 GOTO(out_type, rc = -EINVAL);
1190 d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
1192 CERROR("Cannot allocate device: '%s'\n", typename);
1193 GOTO(out_type, rc = -ENODEV);
1196 LASSERT(child->ld_site);
1197 d->ld_site = child->ld_site;
1200 rc = ldt->ldt_ops->ldto_device_init(d, child);
1202 CERROR("can't init device '%s', rc %d\n", typename, rc);
1203 GOTO(out_alloc, rc);
1209 ldt->ldt_ops->ldto_device_free(d);
1212 class_put_type(type);
1214 RETURN(ERR_PTR(rc));
1217 static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
1219 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1222 /* init the stack */
1223 d = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
1225 GOTO(out, rc = PTR_ERR(d));
1228 d = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
1230 GOTO(out, rc = PTR_ERR(d));
1233 d = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
1235 GOTO(out, rc = PTR_ERR(d));
1238 m->mdt_child = lu2md_dev(d);
1246 static void mdt_fini(struct mdt_device *m)
1248 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1250 mdt_stop_ptlrpc_service(m);
1252 /* finish the stack */
1255 if (d->ld_site != NULL) {
1256 struct lustre_mount_info *lmi = d->ld_site->ls_lmi;
1258 server_put_mount(lmi->lmi_name, lmi->lmi_mnt);
1259 lu_site_fini(d->ld_site);
1260 OBD_FREE_PTR(d->ld_site);
1263 if (m->mdt_namespace != NULL) {
1264 ldlm_namespace_free(m->mdt_namespace, 0);
1265 m->mdt_namespace = NULL;
1268 if (m->mdt_seq_mgr) {
1269 seq_mgr_fini(m->mdt_seq_mgr);
1270 m->mdt_seq_mgr = NULL;
1273 LASSERT(atomic_read(&d->ld_ref) == 0);
1274 md_device_fini(&m->mdt_md_dev);
1277 static int mdt_init0(struct mdt_device *m,
1278 struct lu_device_type *t, struct lustre_cfg *cfg)
1283 struct lu_context ctx;
1284 const char *dev = lustre_cfg_string(cfg, 0);
1285 struct lustre_mount_info *lmi;
1293 md_device_init(&m->mdt_md_dev, t);
1294 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1296 rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1298 CERROR("can't init lu_site, rc %d\n", rc);
1299 GOTO(err_fini_site, rc);
1303 lmi = server_get_mount(dev);
1305 CERROR("Cannot get mount info for %s!\n", dev);
1306 GOTO(err_fini_site, rc = -EFAULT);
1308 //put lmi into lu_site
1311 /* init the stack */
1312 rc = mdt_stack_init(m, cfg);
1314 CERROR("can't init device stack, rc %d\n", rc);
1315 GOTO(err_fini_mount, rc);
1318 m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
1319 if (!m->mdt_seq_mgr) {
1320 CERROR("can't initialize sequence manager\n");
1321 GOTO(err_fini_stack, rc);
1324 rc = lu_context_init(&ctx);
1326 GOTO(err_fini_mgr, rc);
1328 lu_context_enter(&ctx);
1329 /* init sequence info after device stack is initialized. */
1330 rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
1331 lu_context_exit(&ctx);
1333 GOTO(err_fini_ctx, rc);
1335 lu_context_fini(&ctx);
1337 snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1338 m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1339 if (m->mdt_namespace == NULL)
1340 GOTO(err_fini_site, rc = -ENOMEM);
1342 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1344 rc = mdt_fld_init(m);
1346 GOTO(err_free_ns, rc);
1348 rc = mdt_start_ptlrpc_service(m);
1350 GOTO(err_free_fld, rc);
1356 ldlm_namespace_free(m->mdt_namespace, 0);
1357 m->mdt_namespace = NULL;
1359 lu_context_fini(&ctx);
1361 seq_mgr_fini(m->mdt_seq_mgr);
1362 m->mdt_seq_mgr = NULL;
1366 server_put_mount(lmi->lmi_name, lmi->lmi_mnt);
1373 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
1374 struct lu_device *d)
1376 struct mdt_object *mo;
1380 struct lu_object *o;
1381 struct lu_object_header *h;
1383 o = &mo->mot_obj.mo_lu;
1384 h = &mo->mot_header;
1385 lu_object_header_init(h);
1386 lu_object_init(o, h, d);
1387 lu_object_add_top(h, o);
1393 static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
1395 struct mdt_device *d = mdt_dev(o->lo_dev);
1396 struct lu_device *under;
1397 struct lu_object *below;
1399 under = &d->mdt_child->md_lu_dev;
1400 below = under->ld_ops->ldo_object_alloc(ctxt, under);
1401 if (below != NULL) {
1402 lu_object_add(o, below);
1408 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
1410 struct mdt_object *mo = mdt_obj(o);
1411 struct lu_object_header *h;
1415 lu_object_header_fini(h);
1419 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
1423 static int mdt_object_print(struct lu_context *ctxt,
1424 struct seq_file *f, const struct lu_object *o)
1426 return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1429 static struct lu_device_operations mdt_lu_ops = {
1430 .ldo_object_alloc = mdt_object_alloc,
1431 .ldo_object_init = mdt_object_init,
1432 .ldo_object_free = mdt_object_free,
1433 .ldo_object_release = mdt_object_release,
1434 .ldo_object_print = mdt_object_print
1437 /* mds_connect copy */
1438 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1439 struct obd_uuid *cluuid,
1440 struct obd_connect_data *data)
1442 struct obd_export *exp;
1444 struct mdt_device *mdt;
1445 struct mds_export_data *med;
1446 struct mds_client_data *mcd = NULL;
1449 if (!conn || !obd || !cluuid)
1452 mdt = mdt_dev(obd->obd_lu_dev);
1454 rc = class_connect(conn, obd, cluuid);
1458 exp = class_conn2export(conn);
1460 med = &exp->exp_mds_data;
1464 GOTO(out, rc = -ENOMEM);
1466 memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
1471 class_disconnect(exp);
1473 class_export_put(exp);
1479 static int mdt_obd_disconnect(struct obd_export *exp)
1481 struct mds_export_data *med = &exp->exp_mds_data;
1482 unsigned long irqflags;
1487 class_export_get(exp);
1489 /* Disconnect early so that clients can't keep using export */
1490 rc = class_disconnect(exp);
1491 //ldlm_cancel_locks_for_export(exp);
1493 /* complete all outstanding replies */
1494 spin_lock_irqsave(&exp->exp_lock, irqflags);
1495 while (!list_empty(&exp->exp_outstanding_replies)) {
1496 struct ptlrpc_reply_state *rs =
1497 list_entry(exp->exp_outstanding_replies.next,
1498 struct ptlrpc_reply_state, rs_exp_list);
1499 struct ptlrpc_service *svc = rs->rs_service;
1501 spin_lock(&svc->srv_lock);
1502 list_del_init(&rs->rs_exp_list);
1503 ptlrpc_schedule_difficult_reply(rs);
1504 spin_unlock(&svc->srv_lock);
1506 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1508 OBD_FREE_PTR(med->med_mcd);
1510 class_export_put(exp);
1514 static struct obd_ops mdt_obd_device_ops = {
1515 .o_owner = THIS_MODULE,
1516 .o_connect = mdt_obd_connect,
1517 .o_disconnect = mdt_obd_disconnect,
1520 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1521 struct lustre_cfg *cfg)
1523 struct lu_device *l;
1524 struct mdt_device *m;
1530 l = &m->mdt_md_dev.md_lu_dev;
1531 result = mdt_init0(m, t, cfg);
1534 return ERR_PTR(result);
1538 l = ERR_PTR(-ENOMEM);
1542 static void mdt_device_free(struct lu_device *d)
1544 struct mdt_device *m = mdt_dev(d);
1550 static void *mdt_thread_init(struct lu_context *ctx)
1552 struct mdt_thread_info *info;
1554 OBD_ALLOC_PTR(info);
1556 info->mti_ctxt = ctx;
1558 info = ERR_PTR(-ENOMEM);
1562 static void mdt_thread_fini(struct lu_context *ctx, void *data)
1564 struct mdt_thread_info *info = data;
1568 static struct lu_context_key mdt_thread_key = {
1569 .lct_init = mdt_thread_init,
1570 .lct_fini = mdt_thread_fini
1573 static int mdt_type_init(struct lu_device_type *t)
1575 return lu_context_key_register(&mdt_thread_key);
1578 static void mdt_type_fini(struct lu_device_type *t)
1580 lu_context_key_degister(&mdt_thread_key);
1583 static struct lu_device_type_operations mdt_device_type_ops = {
1584 .ldto_init = mdt_type_init,
1585 .ldto_fini = mdt_type_fini,
1587 .ldto_device_alloc = mdt_device_alloc,
1588 .ldto_device_free = mdt_device_free
1591 static struct lu_device_type mdt_device_type = {
1592 .ldt_tags = LU_DEVICE_MD,
1593 .ldt_name = LUSTRE_MDT0_NAME,
1594 .ldt_ops = &mdt_device_type_ops
1597 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1601 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1605 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1607 static int __init mdt_mod_init(void)
1609 struct lprocfs_static_vars lvars;
1611 mdt_num_threads = MDT_NUM_THREADS;
1612 lprocfs_init_vars(mdt, &lvars);
1613 return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
1614 LUSTRE_MDT0_NAME, &mdt_device_type);
1617 static void __exit mdt_mod_exit(void)
1619 class_unregister_type(LUSTRE_MDT0_NAME);
1623 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn) \
1624 [prefix ## _ ## opc - prefix ## _ ## base] = { \
1626 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
1627 .mh_opc = prefix ## _ ## opc, \
1628 .mh_flags = flags, \
1632 #define DEF_MDT_HNDL(flags, name, fn) \
1633 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1635 static struct mdt_handler mdt_mds_ops[] = {
1636 DEF_MDT_HNDL(0, CONNECT, mdt_connect),
1637 DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect),
1638 DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus),
1639 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr),
1640 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name),
1641 DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr),
1642 DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr),
1643 DEF_MDT_HNDL(0, STATFS, mdt_statfs),
1644 DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage),
1645 DEF_MDT_HNDL(0, REINT, mdt_reint),
1646 DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close),
1647 DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
1648 DEF_MDT_HNDL(0, PIN, mdt_pin),
1649 DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
1650 DEF_MDT_HNDL(0, FLD, mdt_fld),
1651 DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
1652 DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
1655 static struct mdt_handler mdt_obd_ops[] = {
1658 #define DEF_DLM_HNDL(flags, name, fn) \
1659 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1661 static struct mdt_handler mdt_dlm_ops[] = {
1662 DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
1663 DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT, mdt_convert),
1664 DEF_DLM_HNDL(0, BL_CALLBACK, mdt_bl_callback),
1665 DEF_DLM_HNDL(0, CP_CALLBACK, mdt_cp_callback)
1668 static struct mdt_handler mdt_llog_ops[] = {
1671 static struct mdt_opc_slice mdt_handlers[] = {
1673 .mos_opc_start = MDS_GETATTR,
1674 .mos_opc_end = MDS_LAST_OPC,
1675 .mos_hs = mdt_mds_ops
1678 .mos_opc_start = OBD_PING,
1679 .mos_opc_end = OBD_LAST_OPC,
1680 .mos_hs = mdt_obd_ops
1683 .mos_opc_start = LDLM_ENQUEUE,
1684 .mos_opc_end = LDLM_LAST_OPC,
1685 .mos_hs = mdt_dlm_ops
1688 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1689 .mos_opc_end = LLOG_LAST_OPC,
1690 .mos_hs = mdt_llog_ops
1697 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1698 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1699 MODULE_LICENSE("GPL");
1701 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1702 "number of mdt service threads to start");
1704 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);