1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
14 * This file is part of the Lustre file system, http://www.lustre.org
15 * Lustre is a trademark of Cluster File Systems, Inc.
17 * You may have signed or agreed to another license before downloading
18 * this software. If so, you are bound by the terms and conditions
19 * of that agreement, and the following does not apply to you. See the
20 * LICENSE file included with this distribution for more information.
22 * If you did not agree to a different license, then this copy of Lustre
23 * is open source software; you can redistribute it and/or modify it
24 * under the terms of version 2 of the GNU General Public License as
25 * published by the Free Software Foundation.
27 * In either case, Lustre is distributed in the hope that it will be
28 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * license text for more details.
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <linux/module.h>
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
43 * struct OBD_{ALLOC,FREE}*()
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
54 /* struct mds_client_data */
55 #include "../mds/mds_internal.h"
56 #include "mdt_internal.h"
59 * Initialized in mdt_mod_init().
61 unsigned long mdt_num_threads;
63 static int mdt_handle(struct ptlrpc_request *req);
64 static struct ptlrpc_thread_key mdt_thread_key;
66 static int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
67 struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
70 struct mdt_object *child;
71 struct mdt_lock_handle *lh;
75 lh = &info->mti_lh[MDT_LH_PARENT];
76 lh->mlh_mode = LCK_PW;
78 o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
82 child = mdt_object_find(d, cfid);
84 result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
85 mdt_object_child(child));
86 mdt_object_put(child);
88 result = PTR_ERR(child);
89 mdt_object_unlock(d->mdt_namespace, o, lh);
94 static int mdt_md_getattr(struct mdt_thread_info *info, struct lu_fid *fid,
95 struct md_object_attr *attr)
97 struct mdt_device *d = info->mti_mdt;
102 o = mdt_object_find(d, fid);
106 result = mdt_child_ops(d)->mdo_attr_get(mdt_object_child(o), name,
107 mdt_object_child(child));
108 mdt_object_put(child);
110 result = PTR_ERR(child);
111 mdt_object_unlock(d->mdt_namespace, o, lh);
116 static int mdt_getstatus(struct mdt_thread_info *info,
117 struct ptlrpc_request *req, int offset)
119 struct md_device *mdd = info->mti_mdt->mdt_child;
120 struct mdt_body *body;
121 int size = sizeof *body;
126 result = lustre_pack_reply(req, 1, &size, NULL);
128 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
130 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
133 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
134 result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
137 /* the last_committed and last_xid fields are filled in for all
138 * replies already - no need to do so here also.
143 static int mdt_statfs(struct mdt_thread_info *info,
144 struct ptlrpc_request *req, int offset)
146 struct md_device *child = info->mti_mdt->mdt_child;
147 struct obd_statfs *osfs;
150 int size = sizeof(struct obd_statfs);
154 result = lustre_pack_reply(req, 1, &size, NULL);
156 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
158 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
159 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
162 osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
163 /* XXX max_age optimisation is needed here. See mds_statfs */
164 result = child->md_ops->mdo_statfs(child, &sfs);
165 statfs_pack(osfs, &sfs);
171 static int mdt_getattr(struct mdt_thread_info *info,
172 struct ptlrpc_request *req, int offset)
174 struct mdt_body *body;
175 int size = sizeof (*body);
176 struct md_obj_attr attr;
181 result = lustre_pack_reply(req, 1, &size, NULL);
183 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
185 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
186 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
189 body = lustre_swab_reqbuf(req, offset, size,
190 lustre_swab_mdt_body);
191 result = mdt_md_getattr(info, body->fid1);
197 static int mdt_getattr(struct mdt_thread_info *info,
198 struct ptlrpc_request *req, int offset)
203 static int mdt_connect(struct mdt_thread_info *info,
204 struct ptlrpc_request *req, int offset)
206 return target_handle_connect(req, mdt_handle);
209 static int mdt_disconnect(struct mdt_thread_info *info,
210 struct ptlrpc_request *req, int offset)
215 static int mdt_getattr_name(struct mdt_thread_info *info,
216 struct ptlrpc_request *req, int offset)
221 static int mdt_setxattr(struct mdt_thread_info *info,
222 struct ptlrpc_request *req, int offset)
227 static int mdt_getxattr(struct mdt_thread_info *info,
228 struct ptlrpc_request *req, int offset)
233 static int mdt_readpage(struct mdt_thread_info *info,
234 struct ptlrpc_request *req, int offset)
239 static int mdt_reint(struct mdt_thread_info *info,
240 struct ptlrpc_request *req, int offset)
245 static int mdt_close(struct mdt_thread_info *info,
246 struct ptlrpc_request *req, int offset)
251 static int mdt_done_writing(struct mdt_thread_info *info,
252 struct ptlrpc_request *req, int offset)
257 static int mdt_pin(struct mdt_thread_info *info,
258 struct ptlrpc_request *req, int offset)
263 static int mdt_sync(struct mdt_thread_info *info,
264 struct ptlrpc_request *req, int offset)
269 static int mdt_set_info(struct mdt_thread_info *info,
270 struct ptlrpc_request *req, int offset)
275 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
276 struct ptlrpc_request *req, int offset)
281 static int mdt_handle_quotactl(struct mdt_thread_info *info,
282 struct ptlrpc_request *req, int offset)
291 static struct ldlm_callback_suite cbs = {
292 .lcs_completion = ldlm_server_completion_ast,
293 .lcs_blocking = ldlm_server_blocking_ast,
297 static int mdt_enqueue(struct mdt_thread_info *info,
298 struct ptlrpc_request *req, int offset)
301 * info->mti_dlm_req already contains swapped and (if necessary)
302 * converted dlm request.
304 LASSERT(info->mti_dlm_req);
306 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
307 return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
310 static int mdt_convert(struct mdt_thread_info *info,
311 struct ptlrpc_request *req, int offset)
313 LASSERT(info->mti_dlm_req);
314 return ldlm_handle_convert0(req, info->mti_dlm_req);
317 static int mdt_bl_callback(struct mdt_thread_info *info,
318 struct ptlrpc_request *req, int offset)
320 CERROR("bl callbacks should not happen on MDS\n");
325 static int mdt_cp_callback(struct mdt_thread_info *info,
326 struct ptlrpc_request *req, int offset)
328 CERROR("cp callbacks should not happen on MDS\n");
334 * Build (DLM) resource name from fid.
336 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
337 struct ldlm_res_id *name)
339 memset(name, 0, sizeof *name);
340 /* we use fid_num() whoch includes also object version instread of raw
342 name->name[0] = fid_seq(f);
343 name->name[1] = fid_num(f);
348 * Return true if resource is for object identified by fid.
350 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
352 return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
355 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
356 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
357 struct lustre_handle *lh, ldlm_mode_t mode,
358 ldlm_policy_data_t *policy)
360 struct ldlm_res_id res_id;
368 /* FIXME: is that correct to have @flags=0 here? */
369 rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
370 LDLM_IBITS, policy, mode, &flags,
371 ldlm_blocking_ast, ldlm_completion_ast, NULL,
372 NULL, NULL, 0, NULL, lh);
373 RETURN (rc == ELDLM_OK ? 0 : -EIO);
376 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
377 struct lustre_handle *lh, ldlm_mode_t mode)
379 struct ldlm_lock *lock;
382 /* FIXME: this is debug stuff, remove it later. */
383 lock = ldlm_handle2lock(lh);
385 CERROR("invalid lock handle "LPX64, lh->cookie);
389 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
391 ldlm_lock_decref(lh, mode);
395 static struct lu_device_operations mdt_lu_ops;
397 static int lu_device_is_mdt(struct lu_device *d)
400 * XXX for now. Tags in lu_device_type->ldt_something are needed.
402 return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
405 static struct mdt_object *mdt_obj(struct lu_object *o)
407 LASSERT(lu_device_is_mdt(o->lo_dev));
408 return container_of(o, struct mdt_object, mot_obj.mo_lu);
411 struct mdt_object *mdt_object_find(struct mdt_device *d,
416 o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
418 return (struct mdt_object *)o;
423 void mdt_object_put(struct mdt_object *o)
425 lu_object_put(&o->mot_obj.mo_lu);
428 struct lu_fid *mdt_object_fid(struct mdt_object *o)
430 return lu_object_fid(&o->mot_obj.mo_lu);
433 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
434 struct mdt_lock_handle *lh, __u64 ibits)
436 ldlm_policy_data_t p = {
441 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
442 LASSERT(lh->mlh_mode != LCK_MINMODE);
444 return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
447 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
448 struct mdt_lock_handle *lh)
450 if (lustre_handle_is_used(&lh->mlh_lh)) {
451 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
452 lh->mlh_lh.cookie = 0;
456 struct mdt_object *mdt_object_find_lock(struct mdt_device *d,
458 struct mdt_lock_handle *lh,
461 struct mdt_object *o;
463 o = mdt_object_find(d, f);
467 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
481 int (*mh_act)(struct mdt_thread_info *info,
482 struct ptlrpc_request *req, int offset);
485 enum mdt_handler_flags {
487 * struct mdt_body is passed in the 0-th incoming buffer.
489 HABEO_CORPUS = (1 << 0),
491 * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
494 HABEO_CLAVIS = (1 << 1)
497 struct mdt_opc_slice {
500 struct mdt_handler *mos_hs;
503 static struct mdt_opc_slice mdt_handlers[];
505 static struct mdt_handler *mdt_handler_find(__u32 opc)
507 struct mdt_opc_slice *s;
508 struct mdt_handler *h;
511 for (s = mdt_handlers; s->mos_hs != NULL; s++) {
512 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
513 h = s->mos_hs + (opc - s->mos_opc_start);
515 LASSERT(h->mh_opc == opc);
517 h = NULL; /* unsupported opc */
524 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
526 return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
529 static int mdt_lock_resname_compat(struct mdt_device *m,
530 struct ldlm_request *req)
532 /* XXX something... later. */
536 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
538 /* XXX something... later. */
543 * Invoke handler for this request opc. Also do necessary preprocessing
544 * (according to handler ->mh_flags), and post-processing (setting of
545 * ->last_{xid,committed}).
547 static int mdt_req_handle(struct mdt_thread_info *info,
548 struct mdt_handler *h, struct ptlrpc_request *req,
557 LASSERT(h->mh_act != NULL);
558 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
559 LASSERT(current->journal_info == NULL);
561 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
563 if (h->mh_fail_id != 0)
564 OBD_FAIL_RETURN(h->mh_fail_id, 0);
566 off = MDS_REQ_REC_OFF + shift;
568 h->mh_flags & HABEO_CLAVIS &&
569 info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME;
572 if (h->mh_flags & HABEO_CORPUS) {
573 struct mdt_body *body;
575 body = info->mti_body =
576 lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
577 lustre_swab_mdt_body);
579 info->mti_object = mdt_object_find(info->mti_mdt,
581 if (IS_ERR(info->mti_object))
582 result = PTR_ERR(info->mti_object);
584 CERROR("Can't unpack body\n");
587 } else if (lock_conv) {
588 struct ldlm_request *dlm;
591 dlm = info->mti_dlm_req =
592 lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
594 lustre_swab_ldlm_request);
596 result = mdt_lock_resname_compat(info->mti_mdt, dlm);
598 CERROR("Can't unpack dlm request\n");
606 result = h->mh_act(info, req, off);
608 * XXX result value is unconditionally shoved into ->rq_status
609 * (original code sometimes placed error code into ->rq_status, and
610 * sometimes returned it to the
611 * caller). ptlrpc_server_handle_request() doesn't check return value
614 req->rq_status = result;
616 LASSERT(current->journal_info == NULL);
619 struct ldlm_reply *rep;
621 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
623 result = mdt_lock_reply_compat(info->mti_mdt, rep);
626 /* If we're DISCONNECTing, the mds_export_data is already freed */
627 if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
628 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
629 target_committed_to_req(req);
634 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
636 lh->mlh_lh.cookie = 0ull;
637 lh->mlh_mode = LCK_MINMODE;
640 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
642 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
645 static void mdt_thread_info_init(struct mdt_thread_info *info)
649 memset(info, 0, sizeof *info);
650 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
654 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
655 info->mti_rep_buf_size[i] = ~0;
656 info->mti_rep_buf_nr = i;
657 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
658 mdt_lock_handle_init(&info->mti_lh[i]);
661 static void mdt_thread_info_fini(struct mdt_thread_info *info)
665 if (info->mti_object != NULL) {
666 mdt_object_put(info->mti_object);
667 info->mti_object = NULL;
669 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
670 mdt_lock_handle_fini(&info->mti_lh[i]);
673 static int mds_msg_check_version(struct lustre_msg *msg)
677 /* TODO: enable the below check while really introducing msg version.
678 * it's disabled because it will break compatibility with b1_4.
686 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
688 CERROR("bad opc %u version %08x, expecting %08x\n",
689 msg->opc, msg->version, LUSTRE_OBD_VERSION);
693 case MDS_GETATTR_NAME:
698 case MDS_DONE_WRITING:
708 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
710 CERROR("bad opc %u version %08x, expecting %08x\n",
711 msg->opc, msg->version, LUSTRE_MDS_VERSION);
715 case LDLM_BL_CALLBACK:
716 case LDLM_CP_CALLBACK:
717 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
719 CERROR("bad opc %u version %08x, expecting %08x\n",
720 msg->opc, msg->version, LUSTRE_DLM_VERSION);
723 case LLOG_ORIGIN_HANDLE_CREATE:
724 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
725 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
726 case LLOG_ORIGIN_HANDLE_READ_HEADER:
727 case LLOG_ORIGIN_HANDLE_CLOSE:
729 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
731 CERROR("bad opc %u version %08x, expecting %08x\n",
732 msg->opc, msg->version, LUSTRE_LOG_VERSION);
735 CERROR("MDS unknown opcode %d\n", msg->opc);
741 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
742 struct obd_device *obd, int *process)
744 switch (req->rq_reqmsg->opc) {
745 case MDS_CONNECT: /* This will never get here, but for completeness. */
746 case OST_CONNECT: /* This will never get here, but for completeness. */
753 case MDS_SYNC: /* used in unmounting */
757 *process = target_queue_recovery_request(req, obd);
761 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
763 /* XXX what should we set rq_status to here? */
764 req->rq_status = -EAGAIN;
765 RETURN(ptlrpc_error(req));
770 * Handle recovery. Return:
771 * +1: continue request processing;
772 * -ve: abort immediately with the given error code;
773 * 0: send reply with error code in req->rq_status;
775 static int mdt_recovery(struct ptlrpc_request *req)
779 struct obd_device *obd;
783 if (req->rq_reqmsg->opc == MDS_CONNECT)
786 if (req->rq_export == NULL) {
787 CERROR("operation %d on unconnected MDS from %s\n",
789 libcfs_id2str(req->rq_peer));
790 req->rq_status = -ENOTCONN;
794 /* sanity check: if the xid matches, the request must be marked as a
795 * resent or replayed */
796 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
797 lustre_msg_get_flags(req->rq_reqmsg) &
798 (MSG_RESENT | MSG_REPLAY)),
799 "rq_xid "LPU64" matches last_xid, "
800 "expected RESENT flag\n", req->rq_xid);
802 /* else: note the opposite is not always true; a RESENT req after a
803 * failover will usually not match the last_xid, since it was likely
804 * never committed. A REPLAYed request will almost never match the
805 * last xid, however it could for a committed, but still retained,
808 obd = req->rq_export->exp_obd;
810 /* Check for aborted recovery... */
811 spin_lock_bh(&obd->obd_processing_task_lock);
812 abort_recovery = obd->obd_abort_recovery;
813 recovering = obd->obd_recovering;
814 spin_unlock_bh(&obd->obd_processing_task_lock);
815 if (abort_recovery) {
816 target_abort_recovery(obd);
817 } else if (recovering) {
821 rc = mdt_filter_recovery_request(req, obd, &should_process);
822 if (rc != 0 || !should_process) {
830 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
832 struct obd_device *obd;
834 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
835 if (req->rq_reqmsg->opc != OBD_PING)
836 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
838 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
839 if (obd && obd->obd_recovering) {
840 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
841 RETURN(target_queue_final_reply(req, req->rq_status));
843 /* Lost a race with recovery; let the error path
845 req->rq_status = -ENOTCONN;
848 target_send_reply(req, req->rq_status, info->mti_fail_id);
849 RETURN(req->rq_status);
852 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
854 struct mdt_handler *h;
855 struct lustre_msg *msg;
860 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
862 LASSERT(current->journal_info == NULL);
864 msg = req->rq_reqmsg;
865 result = mds_msg_check_version(msg);
867 result = mdt_recovery(req);
870 h = mdt_handler_find(msg->opc);
872 result = mdt_req_handle(info, h, req, 0);
874 req->rq_status = -ENOTSUPP;
875 result = ptlrpc_error(req);
880 result = mdt_reply(req, info);
883 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
887 static struct mdt_device *mdt_dev(struct lu_device *d)
889 LASSERT(lu_device_is_mdt(d));
890 return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
893 static int mdt_handle(struct ptlrpc_request *req)
897 struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
901 mdt_thread_info_init(info);
902 /* it can be NULL while CONNECT */
904 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
906 result = mdt_handle0(req, info);
907 mdt_thread_info_fini(info);
911 static int mdt_intent_policy(struct ldlm_namespace *ns,
912 struct ldlm_lock **lockp, void *req_cookie,
913 ldlm_mode_t mode, int flags, void *data)
916 RETURN(ELDLM_LOCK_ABORTED);
919 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
920 svc_handler_t h, char *name,
921 struct proc_dir_entry *proc_entry,
922 svcreq_printfn_t prntfn)
924 return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
925 c->psc_max_req_size, c->psc_max_reply_size,
926 c->psc_req_portal, c->psc_rep_portal,
927 c->psc_watchdog_timeout,
929 prntfn, c->psc_num_threads);
932 static int mdt_config(struct mdt_device *m, const char *name,
933 void *buf, int size, int mode)
935 struct md_device *child = m->mdt_child;
939 if (!child->md_ops->mdo_config)
942 rc = child->md_ops->mdo_config(child, name, buf, size, mode);
946 /* allocate sequence to client */
947 int mdt_seq_alloc(struct mdt_device *m, __u64 *seq)
953 LASSERT(seq != NULL);
955 down(&m->mdt_seq_sem);
959 /* update new allocated sequence on store */
960 rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
961 &m->mdt_seq, sizeof(m->mdt_seq),
964 CERROR("can't save new seq, rc %d\n",
972 EXPORT_SYMBOL(mdt_seq_alloc);
974 /* initialize meta-sequence. First of all try to get it from lower layer down to
975 * back store one. In the case this is first run and there is not meta-sequence
976 * initialized yet - store it to backstore. */
977 static int mdt_seq_init(struct mdt_device *m)
982 /* allocate next seq after root one */
983 m->mdt_seq = LUSTRE_ROOT_FID_SEQ + 1;
985 rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
986 &m->mdt_seq, sizeof(m->mdt_seq),
989 if (rc == -EOPNOTSUPP) {
990 /* provide zero error and let continue with default value of
993 } else if (rc == -ENODATA) {
994 CWARN("initialize new sequence\n");
996 /*initialize new sequence config as it is not yet created. */
997 rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
998 &m->mdt_seq, sizeof(m->mdt_seq),
1000 if (rc == -EOPNOTSUPP) {
1001 /* provide zero error and let continue with default
1002 * value of sequence. */
1003 CERROR("can't update save initial sequence. "
1004 "No method defined\n");
1007 CERROR("can't update config %s, rc %d\n",
1008 LUSTRE_CONFIG_METASEQ, rc);
1012 CERROR("can't get config %s, rc %d\n",
1013 LUSTRE_CONFIG_METASEQ, rc);
1020 CWARN("last used sequence: "LPU64"\n", m->mdt_seq);
1024 static void mdt_fini(struct mdt_device *m)
1026 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1028 if (d->ld_site != NULL) {
1029 lu_site_fini(d->ld_site);
1030 OBD_FREE_PTR(d->ld_site);
1033 if (m->mdt_service != NULL) {
1034 ptlrpc_unregister_service(m->mdt_service);
1035 m->mdt_service = NULL;
1037 if (m->mdt_namespace != NULL) {
1038 ldlm_namespace_free(m->mdt_namespace, 0);
1039 m->mdt_namespace = NULL;
1041 /* finish the stack */
1043 struct lu_device *child = md2lu_dev(m->mdt_child);
1044 child->ld_type->ldt_ops->ldto_device_fini(child);
1047 LASSERT(atomic_read(&d->ld_ref) == 0);
1048 md_device_fini(&m->mdt_md_dev);
1051 static int mdt_init0(struct mdt_device *m,
1052 struct lu_device_type *t, struct lustre_cfg *cfg)
1057 struct obd_device *obd;
1058 struct lu_device *mdt_child;
1059 const char *top = lustre_cfg_string(cfg, 0);
1060 const char *child = lustre_cfg_string(cfg, 1);
1064 /* get next layer */
1065 obd = class_name2obd((char *)child);
1066 if (obd && obd->obd_lu_dev) {
1067 CDEBUG(D_INFO, "Child device is %s\n", child);
1068 m->mdt_child = lu2md_dev(obd->obd_lu_dev);
1069 mdt_child = md2lu_dev(m->mdt_child);
1071 CDEBUG(D_INFO, "Child device %s is not found\n", child);
1079 md_device_init(&m->mdt_md_dev, t);
1080 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1081 lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1083 sema_init(&m->mdt_seq_sem, 1);
1085 m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
1086 m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
1087 m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
1088 m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
1089 m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
1090 m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
1091 m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1093 * We'd like to have a mechanism to set this on a per-device basis,
1096 m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1099 snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1100 m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1101 if (m->mdt_namespace == NULL)
1102 GOTO(err_fini_site, rc = -ENOMEM);
1104 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1106 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1107 "mdt_ldlm_client", &m->mdt_ldlm_client);
1110 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1112 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1114 if (m->mdt_service == NULL)
1115 GOTO(err_free_ns, rc = -ENOMEM);
1117 /* init the stack */
1118 LASSERT(mdt_child->ld_type->ldt_ops->ldto_device_init != NULL);
1119 rc = mdt_child->ld_type->ldt_ops->ldto_device_init(mdt_child, top);
1121 CERROR("can't init device stack, rc %d\n", rc);
1122 GOTO(err_free_svc, rc);
1125 /* init sequence info after device stack is initialized. */
1126 rc = mdt_seq_init(m);
1128 GOTO(err_fini_child, rc);
1130 rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1132 GOTO(err_fini_child, rc);
1137 mdt_child->ld_type->ldt_ops->ldto_device_fini(mdt_child);
1139 ptlrpc_unregister_service(m->mdt_service);
1140 m->mdt_service = NULL;
1142 ldlm_namespace_free(m->mdt_namespace, 0);
1143 m->mdt_namespace = NULL;
1150 static struct lu_object *mdt_object_alloc(struct lu_device *d)
1152 struct mdt_object *mo;
1156 struct lu_object *o;
1157 struct lu_object_header *h;
1159 o = &mo->mot_obj.mo_lu;
1160 h = &mo->mot_header;
1161 lu_object_header_init(h);
1162 lu_object_init(o, h, d);
1163 lu_object_add_top(h, o);
1169 static int mdt_object_init(struct lu_object *o)
1171 struct mdt_device *d = mdt_dev(o->lo_dev);
1172 struct lu_device *under;
1173 struct lu_object *below;
1175 under = &d->mdt_child->md_lu_dev;
1176 below = under->ld_ops->ldo_object_alloc(under);
1177 if (below != NULL) {
1178 lu_object_add(o, below);
1184 static void mdt_object_free(struct lu_object *o)
1186 struct lu_object_header *h;
1190 lu_object_header_fini(h);
1193 static void mdt_object_release(struct lu_object *o)
1197 static int mdt_object_print(struct seq_file *f, const struct lu_object *o)
1199 return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1202 static struct lu_device_operations mdt_lu_ops = {
1203 .ldo_object_alloc = mdt_object_alloc,
1204 .ldo_object_init = mdt_object_init,
1205 .ldo_object_free = mdt_object_free,
1206 .ldo_object_release = mdt_object_release,
1207 .ldo_object_print = mdt_object_print
1210 /* mds_connect copy */
1211 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1212 struct obd_uuid *cluuid, struct obd_connect_data *data)
1214 struct obd_export *exp;
1215 int rc, abort_recovery;
1216 struct mds_export_data *med;
1217 struct mds_client_data *mcd = NULL;
1221 if (!conn || !obd || !cluuid)
1224 /* Check for aborted recovery. */
1225 spin_lock_bh(&obd->obd_processing_task_lock);
1226 abort_recovery = obd->obd_abort_recovery;
1227 spin_unlock_bh(&obd->obd_processing_task_lock);
1229 target_abort_recovery(obd);
1231 /* XXX There is a small race between checking the list and adding a
1232 * new connection for the same UUID, but the real threat (list
1233 * corruption when multiple different clients connect) is solved.
1235 * There is a second race between adding the export to the list,
1236 * and filling in the client data below. Hence skipping the case
1237 * of NULL mcd above. We should already be controlling multiple
1238 * connects at the client, and we can't hold the spinlock over
1239 * memory allocations without risk of deadlocking.
1241 rc = class_connect(conn, obd, cluuid);
1244 exp = class_conn2export(conn);
1246 med = &exp->exp_mds_data;
1248 OBD_ALLOC(mcd, sizeof(*mcd));
1250 GOTO(out, rc = -ENOMEM);
1252 memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
1255 rc = mdt_seq_alloc(mdt_dev(obd->obd_lu_dev),
1262 OBD_FREE(mcd, sizeof(*mcd));
1263 med->med_mcd = NULL;
1265 class_disconnect(exp);
1267 class_export_put(exp);
1273 static struct obd_ops mdt_obd_device_ops = {
1274 .o_owner = THIS_MODULE,
1275 .o_connect = mdt_obd_connect
1278 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1279 struct lustre_cfg *cfg)
1281 struct lu_device *l;
1282 struct mdt_device *m;
1288 l = &m->mdt_md_dev.md_lu_dev;
1289 result = mdt_init0(m, t, cfg);
1292 return ERR_PTR(result);
1296 l = ERR_PTR(-ENOMEM);
1300 static void mdt_device_free(struct lu_device *d)
1302 struct mdt_device *m = mdt_dev(d);
1308 static void *mdt_thread_init(struct ptlrpc_thread *t)
1310 struct mdt_thread_info *info;
1312 return OBD_ALLOC_PTR(info) ? : ERR_PTR(-ENOMEM);
1315 static void mdt_thread_fini(struct ptlrpc_thread *t, void *data)
1317 struct mdt_thread_info *info = data;
1321 static struct ptlrpc_thread_key mdt_thread_key = {
1322 .ptk_init = mdt_thread_init,
1323 .ptk_fini = mdt_thread_fini
1326 static int mdt_type_init(struct lu_device_type *t)
1328 return ptlrpc_thread_key_register(&mdt_thread_key);
1331 static void mdt_type_fini(struct lu_device_type *t)
1335 static struct lu_device_type_operations mdt_device_type_ops = {
1336 .ldto_init = mdt_type_init,
1337 .ldto_fini = mdt_type_fini,
1339 .ldto_device_alloc = mdt_device_alloc,
1340 .ldto_device_free = mdt_device_free
1343 static struct lu_device_type mdt_device_type = {
1344 .ldt_tags = LU_DEVICE_MD,
1345 .ldt_name = LUSTRE_MDT0_NAME,
1346 .ldt_ops = &mdt_device_type_ops
1349 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1353 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1357 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1359 static int __init mdt_mod_init(void)
1361 struct lprocfs_static_vars lvars;
1363 mdt_num_threads = MDT_NUM_THREADS;
1364 lprocfs_init_vars(mdt, &lvars);
1365 return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
1366 LUSTRE_MDT0_NAME, &mdt_device_type);
1369 static void __exit mdt_mod_exit(void)
1371 class_unregister_type(LUSTRE_MDT0_NAME);
1375 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn) \
1376 [prefix ## _ ## opc - prefix ## _ ## base] = { \
1378 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
1379 .mh_opc = prefix ## _ ## opc, \
1380 .mh_flags = flags, \
1384 #define DEF_MDT_HNDL(flags, name, fn) \
1385 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1387 static struct mdt_handler mdt_mds_ops[] = {
1388 DEF_MDT_HNDL(0, CONNECT, mdt_connect),
1389 DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect),
1390 DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus),
1391 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr),
1392 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name),
1393 DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr),
1394 DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr),
1395 DEF_MDT_HNDL(0, STATFS, mdt_statfs),
1396 DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage),
1397 DEF_MDT_HNDL(0, REINT, mdt_reint),
1398 DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close),
1399 DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
1400 DEF_MDT_HNDL(0, PIN, mdt_pin),
1401 DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
1402 DEF_MDT_HNDL(0, SET_INFO, mdt_set_info),
1403 DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
1404 DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
1407 static struct mdt_handler mdt_obd_ops[] = {
1410 #define DEF_DLM_HNDL(flags, name, fn) \
1411 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1413 static struct mdt_handler mdt_dlm_ops[] = {
1414 DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
1415 DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT, mdt_convert),
1416 DEF_DLM_HNDL(0, BL_CALLBACK, mdt_bl_callback),
1417 DEF_DLM_HNDL(0, CP_CALLBACK, mdt_cp_callback)
1420 static struct mdt_handler mdt_llog_ops[] = {
1423 static struct mdt_opc_slice mdt_handlers[] = {
1425 .mos_opc_start = MDS_GETATTR,
1426 .mos_opc_end = MDS_LAST_OPC,
1427 .mos_hs = mdt_mds_ops
1430 .mos_opc_start = OBD_PING,
1431 .mos_opc_end = OBD_LAST_OPC,
1432 .mos_hs = mdt_obd_ops
1435 .mos_opc_start = LDLM_ENQUEUE,
1436 .mos_opc_end = LDLM_LAST_OPC,
1437 .mos_hs = mdt_dlm_ops
1440 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1441 .mos_opc_end = LLOG_LAST_OPC,
1442 .mos_hs = mdt_llog_ops
1449 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1450 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1451 MODULE_LICENSE("GPL");
1453 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1454 "number of mdt service threads to start");
1456 cfs_module(mdt, "0.0.3", mdt_mod_init, mdt_mod_exit);