1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
14 * This file is part of the Lustre file system, http://www.lustre.org
15 * Lustre is a trademark of Cluster File Systems, Inc.
17 * You may have signed or agreed to another license before downloading
18 * this software. If so, you are bound by the terms and conditions
19 * of that agreement, and the following does not apply to you. See the
20 * LICENSE file included with this distribution for more information.
22 * If you did not agree to a different license, then this copy of Lustre
23 * is open source software; you can redistribute it and/or modify it
24 * under the terms of version 2 of the GNU General Public License as
25 * published by the Free Software Foundation.
27 * In either case, Lustre is distributed in the hope that it will be
28 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * license text for more details.
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <linux/module.h>
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
43 * struct OBD_{ALLOC,FREE}*()
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
54 #include <linux/lu_object.h>
56 /* struct mds_client_data */
57 #include "../mds/mds_internal.h"
58 #include "mdt_internal.h"
61 * Initialized in mdt_mod_init().
63 unsigned long mdt_num_threads;
65 static int mdt_handle(struct ptlrpc_request *req);
66 static struct ptlrpc_thread_key mdt_thread_key;
68 static int mdt_getstatus(struct mdt_thread_info *info,
69 struct ptlrpc_request *req, int offset)
71 struct md_device *mdd = info->mti_mdt->mdt_child;
72 struct mdt_body *body;
73 int size = sizeof *body;
78 result = lustre_pack_reply(req, 1, &size, NULL);
80 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
82 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
85 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
86 result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
89 /* the last_committed and last_xid fields are filled in for all
90 * replies already - no need to do so here also.
95 static int mdt_connect(struct mdt_thread_info *info,
96 struct ptlrpc_request *req, int offset)
98 return target_handle_connect(req, mdt_handle);
101 static int mdt_disconnect(struct mdt_thread_info *info,
102 struct ptlrpc_request *req, int offset)
107 static int mdt_getattr(struct mdt_thread_info *info,
108 struct ptlrpc_request *req, int offset)
113 static int mdt_getattr_name(struct mdt_thread_info *info,
114 struct ptlrpc_request *req, int offset)
119 static int mdt_setxattr(struct mdt_thread_info *info,
120 struct ptlrpc_request *req, int offset)
125 static int mdt_getxattr(struct mdt_thread_info *info,
126 struct ptlrpc_request *req, int offset)
131 static int mdt_statfs(struct mdt_thread_info *info,
132 struct ptlrpc_request *req, int offset)
137 static int mdt_readpage(struct mdt_thread_info *info,
138 struct ptlrpc_request *req, int offset)
143 static int mdt_reint(struct mdt_thread_info *info,
144 struct ptlrpc_request *req, int offset)
149 static int mdt_close(struct mdt_thread_info *info,
150 struct ptlrpc_request *req, int offset)
155 static int mdt_done_writing(struct mdt_thread_info *info,
156 struct ptlrpc_request *req, int offset)
161 static int mdt_pin(struct mdt_thread_info *info,
162 struct ptlrpc_request *req, int offset)
167 static int mdt_sync(struct mdt_thread_info *info,
168 struct ptlrpc_request *req, int offset)
173 static int mdt_set_info(struct mdt_thread_info *info,
174 struct ptlrpc_request *req, int offset)
179 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
180 struct ptlrpc_request *req, int offset)
185 static int mdt_handle_quotactl(struct mdt_thread_info *info,
186 struct ptlrpc_request *req, int offset)
191 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
192 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
193 struct lustre_handle *lh, ldlm_mode_t mode,
194 ldlm_policy_data_t *policy)
196 struct ldlm_res_id res_id = { .name = {0} };
204 /* we use fid_num() whoch includes also object version instread of raw
206 res_id.name[0] = fid_seq(f);
207 res_id.name[1] = fid_num(f);
209 /* FIXME: is that correct to have @flags=0 here? */
210 rc = ldlm_cli_enqueue(NULL, NULL, ns, res_id, LDLM_IBITS, policy,
211 mode, &flags, ldlm_blocking_ast,
212 ldlm_completion_ast, NULL, NULL,
214 RETURN (rc == ELDLM_OK ? 0 : -EIO);
217 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
218 struct lustre_handle *lh, ldlm_mode_t mode)
220 struct ldlm_lock *lock;
223 /* FIXME: this is debug stuff, remove it later. */
224 lock = ldlm_handle2lock(lh);
226 CERROR("invalid lock handle "LPX64, lh->cookie);
230 LASSERT(fid_seq(f) == lock->l_resource->lr_name.name[0] &&
231 fid_num(f) == lock->l_resource->lr_name.name[1]);
233 ldlm_lock_decref(lh, mode);
237 static struct lu_device_operations mdt_lu_ops;
239 static int lu_device_is_mdt(struct lu_device *d)
242 * XXX for now. Tags in lu_device_type->ldt_something are needed.
244 return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
247 static struct mdt_object *mdt_obj(struct lu_object *o)
249 LASSERT(lu_device_is_mdt(o->lo_dev));
250 return container_of(o, struct mdt_object, mot_obj.mo_lu);
253 struct mdt_object *mdt_object_find(struct mdt_device *d, struct lu_fid *f)
257 o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
259 return (struct mdt_object *)o;
264 void mdt_object_put(struct mdt_object *o)
266 lu_object_put(&o->mot_obj.mo_lu);
269 static struct lu_fid *mdt_object_fid(struct mdt_object *o)
271 return lu_object_fid(&o->mot_obj.mo_lu);
274 static int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
275 struct mdt_lock_handle *lh, __u64 ibits)
277 ldlm_policy_data_t p = {
282 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
283 LASSERT(lh->mlh_mode != LCK_MINMODE);
285 return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
288 static void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
289 struct mdt_lock_handle *lh)
291 if (lustre_handle_is_used(&lh->mlh_lh)) {
292 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
293 lh->mlh_lh.cookie = 0;
297 struct mdt_object *mdt_object_find_lock(struct mdt_device *d, struct lu_fid *f,
298 struct mdt_lock_handle *lh, __u64 ibits)
300 struct mdt_object *o;
302 o = mdt_object_find(d, f);
306 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
320 int (*mh_act)(struct mdt_thread_info *info,
321 struct ptlrpc_request *req, int offset);
324 enum mdt_handler_flags {
326 * struct mdt_body is passed in the 0-th incoming buffer.
328 HABEO_CORPUS = (1 << 0)
331 struct mdt_opc_slice {
334 struct mdt_handler *mos_hs;
337 static struct mdt_opc_slice mdt_handlers[];
339 struct mdt_handler *mdt_handler_find(__u32 opc)
341 struct mdt_opc_slice *s;
342 struct mdt_handler *h;
345 for (s = mdt_handlers; s->mos_hs != NULL; s++) {
346 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
347 h = s->mos_hs + (opc - s->mos_opc_start);
349 LASSERT(h->mh_opc == opc);
351 h = NULL; /* unsupported opc */
358 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
360 return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
364 * Invoke handler for this request opc. Also do necessary preprocessing
365 * (according to handler ->mh_flags), and post-processing (setting of
366 * ->last_{xid,committed}).
368 static int mdt_req_handle(struct mdt_thread_info *info,
369 struct mdt_handler *h, struct ptlrpc_request *req,
377 LASSERT(h->mh_act != NULL);
378 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
379 LASSERT(current->journal_info == NULL);
381 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
383 if (h->mh_fail_id != 0)
384 OBD_FAIL_RETURN(h->mh_fail_id, 0);
386 off = MDS_REQ_REC_OFF + shift;
388 if (h->mh_flags & HABEO_CORPUS) {
389 info->mti_body = lustre_swab_reqbuf(req, off,
390 sizeof *info->mti_body,
391 lustre_swab_mdt_body);
392 if (info->mti_body == NULL) {
393 CERROR("Can't unpack body\n");
394 result = req->rq_status = -EFAULT;
396 info->mti_object = mdt_object_find(info->mti_mdt,
397 &info->mti_body->fid1);
398 if (IS_ERR(info->mti_object))
399 result = PTR_ERR(info->mti_object);
405 result = h->mh_act(info, req, off);
407 * XXX result value is unconditionally shoved into ->rq_status
408 * (original code sometimes placed error code into ->rq_status, and
409 * sometimes returned it to the
410 * caller). ptlrpc_server_handle_request() doesn't check return value
413 req->rq_status = result;
415 LASSERT(current->journal_info == NULL);
417 /* If we're DISCONNECTing, the mds_export_data is already freed */
418 if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
419 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
420 target_committed_to_req(req);
425 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
427 lh->mlh_lh.cookie = 0ull;
428 lh->mlh_mode = LCK_MINMODE;
431 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
433 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
436 static void mdt_thread_info_init(struct mdt_thread_info *info)
440 memset(info, 0, sizeof *info);
441 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
445 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
446 info->mti_rep_buf_size[i] = ~0;
447 info->mti_rep_buf_nr = i;
448 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
449 mdt_lock_handle_init(&info->mti_lh[i]);
452 static void mdt_thread_info_fini(struct mdt_thread_info *info)
456 if (info->mti_object != NULL) {
457 mdt_object_put(info->mti_object);
458 info->mti_object = NULL;
460 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
461 mdt_lock_handle_fini(&info->mti_lh[i]);
464 static int mds_msg_check_version(struct lustre_msg *msg)
468 /* TODO: enable the below check while really introducing msg version.
469 * it's disabled because it will break compatibility with b1_4.
477 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
479 CERROR("bad opc %u version %08x, expecting %08x\n",
480 msg->opc, msg->version, LUSTRE_OBD_VERSION);
484 case MDS_GETATTR_NAME:
489 case MDS_DONE_WRITING:
499 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
501 CERROR("bad opc %u version %08x, expecting %08x\n",
502 msg->opc, msg->version, LUSTRE_MDS_VERSION);
506 case LDLM_BL_CALLBACK:
507 case LDLM_CP_CALLBACK:
508 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
510 CERROR("bad opc %u version %08x, expecting %08x\n",
511 msg->opc, msg->version, LUSTRE_DLM_VERSION);
514 case LLOG_ORIGIN_HANDLE_CREATE:
515 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
516 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
517 case LLOG_ORIGIN_HANDLE_READ_HEADER:
518 case LLOG_ORIGIN_HANDLE_CLOSE:
520 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
522 CERROR("bad opc %u version %08x, expecting %08x\n",
523 msg->opc, msg->version, LUSTRE_LOG_VERSION);
526 CERROR("MDS unknown opcode %d\n", msg->opc);
532 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
533 struct obd_device *obd, int *process)
535 switch (req->rq_reqmsg->opc) {
536 case MDS_CONNECT: /* This will never get here, but for completeness. */
537 case OST_CONNECT: /* This will never get here, but for completeness. */
544 case MDS_SYNC: /* used in unmounting */
548 *process = target_queue_recovery_request(req, obd);
552 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
554 /* XXX what should we set rq_status to here? */
555 req->rq_status = -EAGAIN;
556 RETURN(ptlrpc_error(req));
561 * Handle recovery. Return:
562 * +ve: continue request processing;
563 * -ve: abort immediately with given (negated) error code;
564 * 0: send reply with error code in req->rq_status;
566 static int mdt_recovery(struct ptlrpc_request *req)
570 struct obd_device *obd;
574 if (req->rq_reqmsg->opc == MDS_CONNECT)
577 if (req->rq_export == NULL) {
578 CERROR("operation %d on unconnected MDS from %s\n",
580 libcfs_id2str(req->rq_peer));
581 req->rq_status = -ENOTCONN;
585 /* sanity check: if the xid matches, the request must be marked as a
586 * resent or replayed */
587 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
588 lustre_msg_get_flags(req->rq_reqmsg) &
589 (MSG_RESENT | MSG_REPLAY)),
590 "rq_xid "LPU64" matches last_xid, "
591 "expected RESENT flag\n", req->rq_xid);
593 /* else: note the opposite is not always true; a RESENT req after a
594 * failover will usually not match the last_xid, since it was likely
595 * never committed. A REPLAYed request will almost never match the
596 * last xid, however it could for a committed, but still retained,
599 obd = req->rq_export->exp_obd;
601 /* Check for aborted recovery... */
602 spin_lock_bh(&obd->obd_processing_task_lock);
603 abort_recovery = obd->obd_abort_recovery;
604 recovering = obd->obd_recovering;
605 spin_unlock_bh(&obd->obd_processing_task_lock);
606 if (abort_recovery) {
607 target_abort_recovery(obd);
608 } else if (recovering) {
612 rc = mdt_filter_recovery_request(req, obd, &should_process);
613 if (rc != 0 || !should_process) {
621 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
623 struct obd_device *obd;
625 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
626 if (req->rq_reqmsg->opc != OBD_PING)
627 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
629 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
630 if (obd && obd->obd_recovering) {
631 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
632 RETURN(target_queue_final_reply(req, req->rq_status));
634 /* Lost a race with recovery; let the error path
636 req->rq_status = -ENOTCONN;
639 target_send_reply(req, req->rq_status, info->mti_fail_id);
640 RETURN(req->rq_status);
643 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
645 struct lustre_msg *msg;
650 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
652 LASSERT(current->journal_info == NULL);
654 msg = req->rq_reqmsg;
655 result = mds_msg_check_version(msg);
657 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
661 result = mdt_recovery(req);
663 struct mdt_handler *h;
665 h = mdt_handler_find(msg->opc);
667 result = mdt_req_handle(info, h, req, 0);
669 req->rq_status = -ENOTSUPP;
670 result = ptlrpc_error(req);
673 } else if (result < 0)
676 RETURN(mdt_reply(req, info));
679 static struct mdt_device *mdt_dev(struct lu_device *d)
681 LASSERT(lu_device_is_mdt(d));
682 return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
685 static int mdt_handle(struct ptlrpc_request *req)
689 struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
691 mdt_thread_info_init(info);
692 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
694 result = mdt_handle0(req, info);
696 mdt_thread_info_fini(info);
700 static int mdt_intent_policy(struct ldlm_namespace *ns,
701 struct ldlm_lock **lockp, void *req_cookie,
702 ldlm_mode_t mode, int flags, void *data)
705 RETURN(ELDLM_LOCK_ABORTED);
708 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
709 svc_handler_t h, char *name,
710 struct proc_dir_entry *proc_entry,
711 svcreq_printfn_t prntfn)
713 return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
714 c->psc_max_req_size, c->psc_max_reply_size,
715 c->psc_req_portal, c->psc_rep_portal,
716 c->psc_watchdog_timeout,
718 prntfn, c->psc_num_threads);
721 int md_device_init(struct md_device *md, struct lu_device_type *t)
723 return lu_device_init(&md->md_lu_dev, t);
726 void md_device_fini(struct md_device *md)
728 lu_device_fini(&md->md_lu_dev);
731 static void mdt_fini(struct lu_device *d)
733 struct mdt_device *m = mdt_dev(d);
735 if (d->ld_site != NULL) {
736 lu_site_fini(d->ld_site);
739 if (m->mdt_service != NULL) {
740 ptlrpc_unregister_service(m->mdt_service);
741 m->mdt_service = NULL;
743 if (m->mdt_namespace != NULL) {
744 ldlm_namespace_free(m->mdt_namespace, 0);
745 m->mdt_namespace = NULL;
748 LASSERT(atomic_read(&d->ld_ref) == 0);
749 md_device_fini(&m->mdt_md_dev);
752 static int mdt_init0(struct mdt_device *m,
753 struct lu_device_type *t, struct lustre_cfg *cfg)
764 md_device_init(&m->mdt_md_dev, t);
766 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
768 m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
769 m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
770 m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
771 m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
772 m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
773 m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
774 m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
776 * We'd like to have a mechanism to set this on a per-device basis,
779 m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
782 lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
784 snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
785 m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
786 if (m->mdt_namespace == NULL)
788 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
790 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
791 "mdt_ldlm_client", &m->mdt_ldlm_client);
794 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
796 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
798 if (m->mdt_service == NULL)
801 return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
804 struct lu_object *mdt_object_alloc(struct lu_device *d)
806 struct mdt_object *mo;
811 struct lu_object_header *h;
813 o = &mo->mot_obj.mo_lu;
815 lu_object_header_init(h);
816 lu_object_init(o, h, d);
817 /* ->lo_depth and ->lo_flags are automatically 0 */
818 lu_object_add_top(h, o);
824 int mdt_object_init(struct lu_object *o)
826 struct mdt_device *d = mdt_dev(o->lo_dev);
827 struct lu_device *under;
828 struct lu_object *below;
830 under = &d->mdt_child->md_lu_dev;
831 below = under->ld_ops->ldo_object_alloc(under);
833 lu_object_add(o, below);
839 void mdt_object_free(struct lu_object *o)
841 struct lu_object_header *h;
845 lu_object_header_fini(h);
848 void mdt_object_release(struct lu_object *o)
852 int mdt_object_print(struct seq_file *f, const struct lu_object *o)
854 return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
857 static struct lu_device_operations mdt_lu_ops = {
858 .ldo_object_alloc = mdt_object_alloc,
859 .ldo_object_init = mdt_object_init,
860 .ldo_object_free = mdt_object_free,
861 .ldo_object_release = mdt_object_release,
862 .ldo_object_print = mdt_object_print
865 struct md_object *mdt_object_child(struct mdt_object *o)
867 return lu2md(lu_object_next(&o->mot_obj.mo_lu));
870 static inline struct md_device_operations *mdt_child_ops(struct mdt_device *d)
872 return d->mdt_child->md_ops;
875 int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
876 struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
878 struct mdt_object *o;
879 struct mdt_object *child;
880 struct mdt_lock_handle *lh;
884 lh = &info->mti_lh[MDT_LH_PARENT];
885 lh->mlh_mode = LCK_PW;
887 o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
891 child = mdt_object_find(d, cfid);
892 if (!IS_ERR(child)) {
893 result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
894 mdt_object_child(child));
895 mdt_object_put(child);
897 result = PTR_ERR(child);
898 mdt_object_unlock(d->mdt_namespace, o, lh);
903 static struct obd_ops mdt_obd_device_ops = {
904 .o_owner = THIS_MODULE
907 struct lu_device *mdt_device_alloc(struct lu_device_type *t,
908 struct lustre_cfg *cfg)
911 struct mdt_device *m;
917 l = &m->mdt_md_dev.md_lu_dev;
918 result = mdt_init0(m, t, cfg);
924 l = ERR_PTR(-ENOMEM);
928 void mdt_device_free(struct lu_device *m)
934 void *mdt_thread_init(struct ptlrpc_thread *t)
936 struct mdt_thread_info *info;
938 return OBD_ALLOC_PTR(info) ? : ERR_PTR(-ENOMEM);
941 void mdt_thread_fini(struct ptlrpc_thread *t, void *data)
943 struct mdt_thread_info *info = data;
947 static struct ptlrpc_thread_key mdt_thread_key = {
948 .ptk_init = mdt_thread_init,
949 .ptk_fini = mdt_thread_fini
952 int mdt_type_init(struct lu_device_type *t)
954 return ptlrpc_thread_key_register(&mdt_thread_key);
957 void mdt_type_fini(struct lu_device_type *t)
961 static struct lu_device_type_operations mdt_device_type_ops = {
962 .ldto_init = mdt_type_init,
963 .ldto_fini = mdt_type_fini,
965 .ldto_device_alloc = mdt_device_alloc,
966 .ldto_device_free = mdt_device_free
969 static struct lu_device_type mdt_device_type = {
970 .ldt_tags = LU_DEVICE_MD,
971 .ldt_name = LUSTRE_MDT0_NAME,
972 .ldt_ops = &mdt_device_type_ops
975 struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
979 struct lprocfs_vars lprocfs_mdt_module_vars[] = {
983 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
985 static int __init mdt_mod_init(void)
987 struct lprocfs_static_vars lvars;
988 struct obd_type *type;
991 mdt_num_threads = MDT_NUM_THREADS;
992 lprocfs_init_vars(mdt, &lvars);
993 result = class_register_type(&mdt_obd_device_ops,
994 lvars.module_vars, LUSTRE_MDT0_NAME);
996 type = class_get_type(LUSTRE_MDT0_NAME);
997 LASSERT(type != NULL);
998 type->typ_lu = &mdt_device_type;
999 result = type->typ_lu->ldt_ops->ldto_init(type->typ_lu);
1001 class_unregister_type(LUSTRE_MDT0_NAME);
1006 static void __exit mdt_mod_exit(void)
1008 class_unregister_type(LUSTRE_MDT0_NAME);
1012 #define DEF_HNDL(prefix, base, flags, opc, fn) \
1013 [prefix ## _ ## opc - prefix ## _ ## base] = { \
1015 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \
1016 .mh_opc = prefix ## _ ## opc, \
1017 .mh_flags = flags, \
1021 #define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn)
1023 static struct mdt_handler mdt_mds_ops[] = {
1024 DEF_MDT_HNDL(0, CONNECT, mdt_connect),
1025 DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect),
1026 DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus),
1027 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr),
1028 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name),
1029 DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr),
1030 DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr),
1031 DEF_MDT_HNDL(0, STATFS, mdt_statfs),
1032 DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage),
1033 DEF_MDT_HNDL(0, REINT, mdt_reint),
1034 DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close),
1035 DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
1036 DEF_MDT_HNDL(0, PIN, mdt_pin),
1037 DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
1038 DEF_MDT_HNDL(0, SET_INFO, mdt_set_info),
1039 DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
1040 DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl),
1043 static struct mdt_handler mdt_obd_ops[] = {
1046 static struct mdt_handler mdt_dlm_ops[] = {
1049 static struct mdt_handler mdt_llog_ops[] = {
1052 static struct mdt_opc_slice mdt_handlers[] = {
1054 .mos_opc_start = MDS_GETATTR,
1055 .mos_opc_end = MDS_LAST_OPC,
1056 .mos_hs = mdt_mds_ops
1059 .mos_opc_start = OBD_PING,
1060 .mos_opc_end = OBD_LAST_OPC,
1061 .mos_hs = mdt_obd_ops
1064 .mos_opc_start = LDLM_ENQUEUE,
1065 .mos_opc_end = LDLM_LAST_OPC,
1066 .mos_hs = mdt_dlm_ops
1069 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1070 .mos_opc_end = LLOG_LAST_OPC,
1071 .mos_hs = mdt_llog_ops
1078 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1079 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1080 MODULE_LICENSE("GPL");
1082 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1083 "number of mdt service threads to start");
1085 cfs_module(mdt, "0.0.2", mdt_mod_init, mdt_mod_exit);