1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
14 * This file is part of the Lustre file system, http://www.lustre.org
15 * Lustre is a trademark of Cluster File Systems, Inc.
17 * You may have signed or agreed to another license before downloading
18 * this software. If so, you are bound by the terms and conditions
19 * of that agreement, and the following does not apply to you. See the
20 * LICENSE file included with this distribution for more information.
22 * If you did not agree to a different license, then this copy of Lustre
23 * is open source software; you can redistribute it and/or modify it
24 * under the terms of version 2 of the GNU General Public License as
25 * published by the Free Software Foundation.
27 * In either case, Lustre is distributed in the hope that it will be
28 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * license text for more details.
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <linux/module.h>
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
43 * struct OBD_{ALLOC,FREE}*()
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
54 #include <linux/dt_object.h>
56 /*LUSTRE_POSIX_ACL_MAX_SIZE*/
57 #include <linux/lustre_acl.h>
60 /* struct mds_client_data */
61 #include "../mds/mds_internal.h"
62 #include "mdt_internal.h"
65 * Initialized in mdt_mod_init().
67 unsigned long mdt_num_threads;
69 static int mdt_handle (struct ptlrpc_request *req);
70 static struct mdt_device *mdt_dev (struct lu_device *d);
71 static struct lu_fid *mdt_object_fid(struct mdt_object *o);
73 static struct lu_context_key mdt_thread_key;
74 static struct lu_object_operations mdt_obj_ops;
76 static int mdt_getstatus(struct mdt_thread_info *info,
77 struct ptlrpc_request *req, int offset)
79 struct md_device *next = info->mti_mdt->mdt_child;
84 info->mti_rep_buf_size[0] = sizeof (struct mdt_body);
85 result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
87 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
88 sizeof (struct mdt_body));
89 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
92 info->mti_body = lustre_msg_buf(req->rq_repmsg, 0,
93 sizeof (struct mdt_body));
94 result = next->md_ops->mdo_root_get(info->mti_ctxt,
96 &info->mti_body->fid1);
99 /* the last_committed and last_xid fields are filled in for all
100 * replies already - no need to do so here also.
105 static int mdt_statfs(struct mdt_thread_info *info,
106 struct ptlrpc_request *req, int offset)
108 struct md_device *next = info->mti_mdt->mdt_child;
109 struct obd_statfs *osfs;
115 info->mti_rep_buf_size[0] = sizeof(struct obd_statfs);
116 result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
118 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
119 sizeof(struct obd_statfs));
120 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
121 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
124 osfs = lustre_msg_buf(req->rq_repmsg, 0,
125 sizeof(struct obd_statfs));
129 /* XXX max_age optimisation is needed here. See mds_statfs */
130 result = next->md_ops->mdo_statfs(info->mti_ctxt, next, sfs);
131 statfs_pack(osfs, sfs);
138 static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
140 b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
141 OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
142 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
144 if (!S_ISREG(attr->la_mode))
145 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
148 b->atime = attr->la_atime;
149 b->mtime = attr->la_mtime;
150 b->ctime = attr->la_ctime;
151 b->mode = attr->la_mode;
152 b->size = attr->la_size;
153 b->blocks = attr->la_blocks;
154 b->uid = attr->la_uid;
155 b->gid = attr->la_gid;
156 b->flags = attr->la_flags;
157 b->nlink = attr->la_nlink;
160 static int mdt_getattr(struct mdt_thread_info *info,
161 struct ptlrpc_request *req, int offset)
165 LASSERT(info->mti_object != NULL);
169 info->mti_rep_buf_size[0] = sizeof(struct mdt_body);
170 result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
172 CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
173 sizeof(struct mdt_body), result);
174 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
175 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
178 struct md_object *next = mdt_object_child(info->mti_object);
180 result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
183 info->mti_body = lustre_msg_buf(req->rq_repmsg, 0,
184 sizeof(struct mdt_body));
185 mdt_pack_attr2body(info->mti_body, &info->mti_attr);
186 info->mti_body->fid1 = *mdt_object_fid(info->mti_object);
192 static struct lu_device_operations mdt_lu_ops;
194 static int lu_device_is_mdt(struct lu_device *d)
197 * XXX for now. Tags in lu_device_type->ldt_something are needed.
199 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
202 static struct mdt_device *mdt_dev(struct lu_device *d)
204 LASSERT(lu_device_is_mdt(d));
205 return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
208 static int mdt_connect(struct mdt_thread_info *info,
209 struct ptlrpc_request *req, int offset)
213 result = target_handle_connect(req, mdt_handle);
215 struct obd_connect_data *data;
217 LASSERT(req->rq_export != NULL);
218 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
220 data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
221 result = seq_mgr_alloc(info->mti_ctxt,
222 info->mti_mdt->mdt_seq_mgr,
228 static int mdt_disconnect(struct mdt_thread_info *info,
229 struct ptlrpc_request *req, int offset)
231 return target_handle_disconnect(req);
234 static int mdt_getattr_name(struct mdt_thread_info *info,
235 struct ptlrpc_request *req, int offset)
240 static int mdt_setxattr(struct mdt_thread_info *info,
241 struct ptlrpc_request *req, int offset)
246 static int mdt_getxattr(struct mdt_thread_info *info,
247 struct ptlrpc_request *req, int offset)
252 static int mdt_readpage(struct mdt_thread_info *info,
253 struct ptlrpc_request *req, int offset)
258 static int mdt_reint_internal(struct mdt_thread_info *info,
259 struct ptlrpc_request *req,
261 struct mdt_lock_handle *lockh)
265 rc = mdt_reint_unpack(info, req, offset);
266 if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
267 CERROR("invalid record\n");
268 RETURN(rc = -EINVAL);
270 rc = mdt_reint_rec(info, lockh);
274 static int mdt_reint(struct mdt_thread_info *info,
275 struct ptlrpc_request *req, int offset)
277 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
284 /* NB only peek inside req now; mdt_XXX_unpack() will swab it */
286 CERROR ("Can't inspect opcode\n");
290 if (lustre_msg_swabbed (req->rq_reqmsg))
293 DEBUG_REQ(D_INODE, req, "reint opt = %d", opc);
295 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
297 if (opc == REINT_UNLINK || opc == REINT_RENAME)
298 info->mti_rep_buf_nr = 3;
299 else if (opc == REINT_OPEN)
300 info->mti_rep_buf_nr = 2;
302 info->mti_rep_buf_nr = 1;
303 info->mti_rep_buf_size[0] = sizeof(struct mdt_body);
304 info->mti_rep_buf_size[1] = sizeof(struct lov_mds_md); /*FIXME:See mds*/
305 info->mti_rep_buf_size[2] = sizeof(struct llog_cookie);/*FIXME:See mds*/
306 rc = lustre_pack_reply(req, info->mti_rep_buf_nr,
307 info->mti_rep_buf_size, NULL);
310 rc = mdt_reint_internal(info, req, offset, NULL);
314 static int mdt_close(struct mdt_thread_info *info,
315 struct ptlrpc_request *req, int offset)
320 static int mdt_done_writing(struct mdt_thread_info *info,
321 struct ptlrpc_request *req, int offset)
326 static int mdt_pin(struct mdt_thread_info *info,
327 struct ptlrpc_request *req, int offset)
332 static int mdt_sync(struct mdt_thread_info *info,
333 struct ptlrpc_request *req, int offset)
338 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
339 struct ptlrpc_request *req, int offset)
344 static int mdt_handle_quotactl(struct mdt_thread_info *info,
345 struct ptlrpc_request *req, int offset)
354 static struct ldlm_callback_suite cbs = {
355 .lcs_completion = ldlm_server_completion_ast,
356 .lcs_blocking = ldlm_server_blocking_ast,
360 static int mdt_enqueue(struct mdt_thread_info *info,
361 struct ptlrpc_request *req, int offset)
364 * info->mti_dlm_req already contains swapped and (if necessary)
365 * converted dlm request.
367 LASSERT(info->mti_dlm_req != NULL);
369 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
370 return ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
371 req, info->mti_dlm_req, &cbs);
374 static int mdt_convert(struct mdt_thread_info *info,
375 struct ptlrpc_request *req, int offset)
377 LASSERT(info->mti_dlm_req);
378 return ldlm_handle_convert0(req, info->mti_dlm_req);
381 static int mdt_bl_callback(struct mdt_thread_info *info,
382 struct ptlrpc_request *req, int offset)
384 CERROR("bl callbacks should not happen on MDS\n");
389 static int mdt_cp_callback(struct mdt_thread_info *info,
390 struct ptlrpc_request *req, int offset)
392 CERROR("cp callbacks should not happen on MDS\n");
398 * Build (DLM) resource name from fid.
400 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
401 struct ldlm_res_id *name)
403 memset(name, 0, sizeof *name);
404 /* we use fid_num() whoch includes also object version instread of raw
406 name->name[0] = fid_seq(f);
407 name->name[1] = fid_num(f);
412 * Return true if resource is for object identified by fid.
414 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
416 return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
419 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
420 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
421 struct lustre_handle *lh, ldlm_mode_t mode,
422 ldlm_policy_data_t *policy)
424 struct ldlm_res_id res_id;
432 /* FIXME: is that correct to have @flags=0 here? */
433 rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
434 LDLM_IBITS, policy, mode, &flags,
435 ldlm_blocking_ast, ldlm_completion_ast, NULL,
436 NULL, NULL, 0, NULL, lh);
437 RETURN (rc == ELDLM_OK ? 0 : -EIO);
440 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
441 struct lustre_handle *lh, ldlm_mode_t mode)
443 struct ldlm_lock *lock;
446 /* FIXME: this is debug stuff, remove it later. */
447 lock = ldlm_handle2lock(lh);
449 CERROR("invalid lock handle "LPX64, lh->cookie);
453 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
455 ldlm_lock_decref(lh, mode);
459 static struct mdt_object *mdt_obj(struct lu_object *o)
461 LASSERT(lu_device_is_mdt(o->lo_dev));
462 return container_of0(o, struct mdt_object, mot_obj.mo_lu);
465 struct mdt_object *mdt_object_find(struct lu_context *ctxt,
466 struct mdt_device *d,
471 o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
473 return (struct mdt_object *)o;
478 void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
480 lu_object_put(ctxt, &o->mot_obj.mo_lu);
483 static struct lu_fid *mdt_object_fid(struct mdt_object *o)
485 return lu_object_fid(&o->mot_obj.mo_lu);
488 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
489 struct mdt_lock_handle *lh, __u64 ibits)
491 ldlm_policy_data_t p = {
496 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
497 LASSERT(lh->mlh_mode != LCK_MINMODE);
499 return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
502 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
503 struct mdt_lock_handle *lh)
505 if (lustre_handle_is_used(&lh->mlh_lh)) {
506 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
507 lh->mlh_lh.cookie = 0;
511 struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
512 struct mdt_device *d,
514 struct mdt_lock_handle *lh,
517 struct mdt_object *o;
519 o = mdt_object_find(ctxt, d, f);
523 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
525 mdt_object_put(ctxt, o);
537 int (*mh_act)(struct mdt_thread_info *info,
538 struct ptlrpc_request *req, int offset);
541 enum mdt_handler_flags {
543 * struct mdt_body is passed in the 0-th incoming buffer.
545 HABEO_CORPUS = (1 << 0),
547 * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
550 HABEO_CLAVIS = (1 << 1)
553 struct mdt_opc_slice {
556 struct mdt_handler *mos_hs;
559 static struct mdt_opc_slice mdt_handlers[];
561 static struct mdt_handler *mdt_handler_find(__u32 opc)
563 struct mdt_opc_slice *s;
564 struct mdt_handler *h;
567 for (s = mdt_handlers; s->mos_hs != NULL; s++) {
568 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
569 h = s->mos_hs + (opc - s->mos_opc_start);
571 LASSERT(h->mh_opc == opc);
573 h = NULL; /* unsupported opc */
580 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
582 return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
585 static int mdt_lock_resname_compat(struct mdt_device *m,
586 struct ldlm_request *req)
588 /* XXX something... later. */
592 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
594 /* XXX something... later. */
599 * Invoke handler for this request opc. Also do necessary preprocessing
600 * (according to handler ->mh_flags), and post-processing (setting of
601 * ->last_{xid,committed}).
603 static int mdt_req_handle(struct mdt_thread_info *info,
604 struct mdt_handler *h, struct ptlrpc_request *req,
612 LASSERT(h->mh_act != NULL);
613 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
614 LASSERT(current->journal_info == NULL);
616 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
618 if (h->mh_fail_id != 0)
619 OBD_FAIL_RETURN(h->mh_fail_id, 0);
621 off = MDS_REQ_REC_OFF + shift;
624 if (h->mh_flags & HABEO_CORPUS) {
625 struct mdt_body *body;
627 body = info->mti_body =
628 lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
629 lustre_swab_mdt_body);
631 info->mti_object = mdt_object_find(info->mti_ctxt,
634 if (IS_ERR(info->mti_object)) {
635 result = PTR_ERR(info->mti_object);
636 info->mti_object = NULL;
639 CERROR("Can't unpack body\n");
642 } else if (h->mh_flags & HABEO_CLAVIS) {
643 struct ldlm_request *dlm;
646 dlm = info->mti_dlm_req =
647 lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
649 lustre_swab_ldlm_request);
651 if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
652 result = mdt_lock_resname_compat(info->mti_mdt,
655 CERROR("Can't unpack dlm request\n");
663 result = h->mh_act(info, req, off);
665 * XXX result value is unconditionally shoved into ->rq_status
666 * (original code sometimes placed error code into ->rq_status, and
667 * sometimes returned it to the
668 * caller). ptlrpc_server_handle_request() doesn't check return value
671 req->rq_status = result;
673 LASSERT(current->journal_info == NULL);
675 if (h->mh_flags & HABEO_CLAVIS &&
676 info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
677 struct ldlm_reply *rep;
679 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
681 result = mdt_lock_reply_compat(info->mti_mdt, rep);
684 /* If we're DISCONNECTing, the mds_export_data is already freed */
685 if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
686 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
687 target_committed_to_req(req);
692 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
694 lh->mlh_lh.cookie = 0ull;
695 lh->mlh_mode = LCK_MINMODE;
698 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
700 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
703 static void mdt_thread_info_init(struct mdt_thread_info *info)
707 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
711 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
712 info->mti_rep_buf_size[i] = ~0;
713 info->mti_rep_buf_nr = i;
714 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
715 mdt_lock_handle_init(&info->mti_lh[i]);
716 lu_context_enter(info->mti_ctxt);
719 static void mdt_thread_info_fini(struct mdt_thread_info *info)
723 lu_context_exit(info->mti_ctxt);
724 if (info->mti_object != NULL) {
725 mdt_object_put(info->mti_ctxt, info->mti_object);
726 info->mti_object = NULL;
728 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
729 mdt_lock_handle_fini(&info->mti_lh[i]);
732 static int mds_msg_check_version(struct lustre_msg *msg)
736 /* TODO: enable the below check while really introducing msg version.
737 * it's disabled because it will break compatibility with b1_4.
745 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
747 CERROR("bad opc %u version %08x, expecting %08x\n",
748 msg->opc, msg->version, LUSTRE_OBD_VERSION);
752 case MDS_GETATTR_NAME:
757 case MDS_DONE_WRITING:
767 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
769 CERROR("bad opc %u version %08x, expecting %08x\n",
770 msg->opc, msg->version, LUSTRE_MDS_VERSION);
774 case LDLM_BL_CALLBACK:
775 case LDLM_CP_CALLBACK:
776 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
778 CERROR("bad opc %u version %08x, expecting %08x\n",
779 msg->opc, msg->version, LUSTRE_DLM_VERSION);
782 case LLOG_ORIGIN_HANDLE_CREATE:
783 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
784 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
785 case LLOG_ORIGIN_HANDLE_READ_HEADER:
786 case LLOG_ORIGIN_HANDLE_CLOSE:
788 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
790 CERROR("bad opc %u version %08x, expecting %08x\n",
791 msg->opc, msg->version, LUSTRE_LOG_VERSION);
794 CERROR("MDS unknown opcode %d\n", msg->opc);
800 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
801 struct obd_device *obd, int *process)
803 switch (req->rq_reqmsg->opc) {
804 case MDS_CONNECT: /* This will never get here, but for completeness. */
805 case OST_CONNECT: /* This will never get here, but for completeness. */
812 case MDS_SYNC: /* used in unmounting */
816 *process = target_queue_recovery_request(req, obd);
820 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
822 /* XXX what should we set rq_status to here? */
823 req->rq_status = -EAGAIN;
824 RETURN(ptlrpc_error(req));
829 * Handle recovery. Return:
830 * +1: continue request processing;
831 * -ve: abort immediately with the given error code;
832 * 0: send reply with error code in req->rq_status;
834 static int mdt_recovery(struct ptlrpc_request *req)
838 struct obd_device *obd;
842 if (req->rq_reqmsg->opc == MDS_CONNECT)
845 if (req->rq_export == NULL) {
846 CERROR("operation %d on unconnected MDS from %s\n",
848 libcfs_id2str(req->rq_peer));
849 req->rq_status = -ENOTCONN;
853 /* sanity check: if the xid matches, the request must be marked as a
854 * resent or replayed */
855 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
856 lustre_msg_get_flags(req->rq_reqmsg) &
857 (MSG_RESENT | MSG_REPLAY)),
858 "rq_xid "LPU64" matches last_xid, "
859 "expected RESENT flag\n", req->rq_xid);
861 /* else: note the opposite is not always true; a RESENT req after a
862 * failover will usually not match the last_xid, since it was likely
863 * never committed. A REPLAYed request will almost never match the
864 * last xid, however it could for a committed, but still retained,
867 obd = req->rq_export->exp_obd;
869 /* Check for aborted recovery... */
870 spin_lock_bh(&obd->obd_processing_task_lock);
871 abort_recovery = obd->obd_abort_recovery;
872 recovering = obd->obd_recovering;
873 spin_unlock_bh(&obd->obd_processing_task_lock);
874 if (abort_recovery) {
875 target_abort_recovery(obd);
876 } else if (recovering) {
880 rc = mdt_filter_recovery_request(req, obd, &should_process);
881 if (rc != 0 || !should_process) {
889 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
891 struct obd_device *obd;
893 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
894 if (req->rq_reqmsg->opc != OBD_PING)
895 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
897 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
898 if (obd && obd->obd_recovering) {
899 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
900 RETURN(target_queue_final_reply(req, req->rq_status));
902 /* Lost a race with recovery; let the error path
904 req->rq_status = -ENOTCONN;
907 target_send_reply(req, req->rq_status, info->mti_fail_id);
908 RETURN(req->rq_status);
911 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
913 struct mdt_handler *h;
914 struct lustre_msg *msg;
919 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
921 LASSERT(current->journal_info == NULL);
923 msg = req->rq_reqmsg;
924 result = mds_msg_check_version(msg);
926 result = mdt_recovery(req);
929 h = mdt_handler_find(msg->opc);
931 result = mdt_req_handle(info, h, req, 0);
933 req->rq_status = -ENOTSUPP;
934 result = ptlrpc_error(req);
939 result = mdt_reply(req, info);
942 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
947 * MDT handler function called by ptlrpc service thread when request comes.
949 * XXX common "target" functionality should be factored into separate module
950 * shared by mdt, ost and stand-alone services like fld.
952 static int mdt_handle(struct ptlrpc_request *req)
955 struct lu_context *ctx;
956 struct mdt_thread_info *info;
959 ctx = req->rq_svc_thread->t_ctx;
960 LASSERT(ctx != NULL);
961 LASSERT(ctx->lc_thread == req->rq_svc_thread);
963 info = lu_context_key_get(ctx, &mdt_thread_key);
964 LASSERT(info != NULL);
966 mdt_thread_info_init(info);
967 /* it can be NULL while CONNECT */
969 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
971 result = mdt_handle0(req, info);
972 mdt_thread_info_fini(info);
976 /*Please move these function from mds to mdt*/
977 int intent_disposition(struct ldlm_reply *rep, int flag)
981 return (rep->lock_policy_res1 & flag);
984 void intent_set_disposition(struct ldlm_reply *rep, int flag)
988 rep->lock_policy_res1 |= flag;
991 static void fixup_handle_for_resent_req(struct mdt_thread_info *info,
992 struct ptlrpc_request *req,
994 struct ldlm_lock *new_lock,
995 struct ldlm_lock **old_lock,
996 struct mdt_lock_handle *lockh)
998 struct obd_export *exp = req->rq_export;
999 struct mdt_device * mdt = info->mti_mdt;
1000 struct ldlm_request *dlmreq =
1001 lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
1002 struct lustre_handle remote_hdl = dlmreq->lock_handle1;
1003 struct list_head *iter;
1005 if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1008 l_lock(&mdt->mdt_namespace->ns_lock);
1009 list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1010 struct ldlm_lock *lock;
1011 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1012 if (lock == new_lock)
1014 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1015 lockh->mlh_lh.cookie = lock->l_handle.h_cookie;
1016 LDLM_DEBUG(lock, "restoring lock cookie");
1017 DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1018 lockh->mlh_lh.cookie);
1020 *old_lock = LDLM_LOCK_GET(lock);
1021 l_unlock(&mdt->mdt_namespace->ns_lock);
1025 l_unlock(&mdt->mdt_namespace->ns_lock);
1027 /* If the xid matches, then we know this is a resent request,
1028 * and allow it. (It's probably an OPEN, for which we don't
1031 le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
1034 /* This remote handle isn't enqueued, so we never received or
1035 * processed this request. Clear MSG_RESENT, because it can
1036 * be handled like any normal request now. */
1038 lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1040 DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1044 static int mdt_intent_policy(struct ldlm_namespace *ns,
1045 struct ldlm_lock **lockp, void *req_cookie,
1046 ldlm_mode_t mode, int flags, void *data)
1048 struct ptlrpc_request *req = req_cookie;
1049 struct ldlm_lock *lock = *lockp;
1050 struct ldlm_intent *it;
1051 struct ldlm_reply *rep;
1052 struct mdt_lock_handle lockh = { {0} };
1053 struct ldlm_lock *new_lock = NULL;
1054 int getattr_part = MDS_INODELOCK_UPDATE;
1055 int offset = MDS_REQ_INTENT_REC_OFF;
1057 struct mdt_thread_info *info;
1061 LASSERT(req != NULL);
1063 /* We already got it in mdt_handle. But we have to do it again*/
1064 info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1065 LASSERT(info != NULL);
1066 mdt_thread_info_init(info);
1069 if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
1070 /* No intent was provided */
1071 info->mti_rep_buf_size[0] = sizeof(struct ldlm_reply);
1072 rc = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
1074 mdt_thread_info_fini(info);
1078 it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
1079 lustre_swab_ldlm_intent);
1081 CERROR("Intent missing\n");
1082 mdt_thread_info_fini(info);
1083 RETURN(req->rq_status = -EFAULT);
1086 LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
1087 info->mti_rep_buf_nr = 3;
1088 info->mti_rep_buf_size[0] = sizeof(*rep);
1089 info->mti_rep_buf_size[1] = sizeof(struct mdt_body);
1090 info->mti_rep_buf_size[2] = sizeof(struct lov_mds_md);/*FIXME:See mds*/
1092 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
1093 (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP))){
1094 /* we should never allow OBD_CONNECT_ACL if not configured */
1095 info->mti_rep_buf_size[info->mti_rep_buf_nr++] =
1096 LUSTRE_POSIX_ACL_MAX_SIZE;
1098 else if (it->opc & IT_UNLINK){
1099 info->mti_rep_buf_size[info->mti_rep_buf_nr++] =
1100 sizeof(struct llog_cookie);
1104 rc = lustre_pack_reply(req, info->mti_rep_buf_nr,
1105 info->mti_rep_buf_size, NULL);
1107 mdt_thread_info_fini(info);
1108 RETURN(req->rq_status = rc);
1111 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
1112 intent_set_disposition(rep, DISP_IT_EXECD);
1115 /* execute policy */
1116 switch ((long)it->opc) {
1118 case IT_CREAT|IT_OPEN:
1119 fixup_handle_for_resent_req(info, req,
1120 MDS_REQ_INTENT_LOCKREQ_OFF,
1121 lock, NULL, &lockh);
1122 /* XXX swab here to assert that an mds_open reint
1123 * packet is following */
1124 rep->lock_policy_res2 = mdt_reint_internal(info, req,
1127 /* We abort the lock if the lookup was negative and
1128 * we did not make it to the OPEN portion */
1129 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
1130 RETURN(ELDLM_LOCK_ABORTED);
1131 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
1132 !intent_disposition(rep, DISP_OPEN_OPEN))
1133 RETURN(ELDLM_LOCK_ABORTED);
1137 getattr_part = MDS_INODELOCK_LOOKUP;
1139 getattr_part |= MDS_INODELOCK_LOOKUP;
1142 fixup_handle_for_resent_req(info, req,
1143 MDS_REQ_INTENT_LOCKREQ_OFF,
1144 lock, &new_lock, &lockh);
1146 /* INODEBITS_INTEROP: if this lock was converted from a
1147 * plain lock (client does not support inodebits), then
1148 * child lock must be taken with both lookup and update
1149 * bits set for all operations.
1151 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
1152 getattr_part = MDS_INODELOCK_LOOKUP |
1153 MDS_INODELOCK_UPDATE;
1155 rep->lock_policy_res2 = mds_getattr_name(offset, req,
1156 getattr_part, &lockh);
1157 /* FIXME: LDLM can set req->rq_status. MDS sets
1158 policy_res{1,2} with disposition and status.
1159 - replay: returns 0 & req->status is old status
1160 - otherwise: returns req->status */
1161 if (intent_disposition(rep, DISP_LOOKUP_NEG))
1162 rep->lock_policy_res2 = 0;
1163 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
1164 rep->lock_policy_res2)
1165 RETURN(ELDLM_LOCK_ABORTED);
1166 if (req->rq_status != 0) {
1168 rep->lock_policy_res2 = req->rq_status;
1169 RETURN(ELDLM_LOCK_ABORTED);
1172 RETURN(ELDLM_LOCK_ABORTED);
1175 CERROR("Unhandled intent "LPD64"\n", it->opc);
1179 /* By this point, whatever function we called above must have either
1180 * filled in 'lockh', been an intent replay, or returned an error. We
1181 * want to allow replayed RPCs to not get a lock, since we would just
1182 * drop it below anyways because lock replay is done separately by the
1183 * client afterwards. For regular RPCs we want to give the new lock to
1184 * the client instead of whatever lock it was about to get. */
1185 if (new_lock == NULL)
1186 new_lock = ldlm_handle2lock(&lockh.mlh_lh);
1187 if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1190 LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
1191 it->opc, lockh.mlh_lh.cookie);
1193 /* If we've already given this lock to a client once, then we should
1194 * have no readers or writers. Otherwise, we should have one reader
1195 * _or_ writer ref (which will be zeroed below) before returning the
1196 * lock to a client. */
1197 if (new_lock->l_export == req->rq_export) {
1198 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
1200 LASSERT(new_lock->l_export == NULL);
1201 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1206 if (new_lock->l_export == req->rq_export) {
1207 /* Already gave this to the client, which means that we
1208 * reconstructed a reply. */
1209 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1211 RETURN(ELDLM_LOCK_REPLACED);
1214 /* Fixup the lock to be given to the client */
1215 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1216 new_lock->l_readers = 0;
1217 new_lock->l_writers = 0;
1219 new_lock->l_export = class_export_get(req->rq_export);
1220 list_add(&new_lock->l_export_chain,
1221 &new_lock->l_export->exp_ldlm_data.led_held_locks);
1223 new_lock->l_blocking_ast = lock->l_blocking_ast;
1224 new_lock->l_completion_ast = lock->l_completion_ast;
1226 memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
1227 sizeof(lock->l_remote_handle));
1229 new_lock->l_flags &= ~LDLM_FL_LOCAL;
1231 LDLM_LOCK_PUT(new_lock);
1232 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1234 RETURN(ELDLM_LOCK_REPLACED);
1237 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
1238 const char *name, void *buf, int size, int mode)
1240 struct md_device *child = m->mdt_child;
1242 RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
1245 static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
1248 struct mdt_device *m = opaque;
1252 rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
1253 seq, sizeof(*seq), mode);
1257 static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
1260 RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
1263 static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
1266 RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
1269 struct lu_seq_mgr_ops seq_mgr_ops = {
1270 .smo_read = mdt_seq_mgr_read,
1271 .smo_write = mdt_seq_mgr_write
1278 static int mdt_fld_init(struct mdt_device *m)
1284 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1286 OBD_ALLOC_PTR(ls->ls_fld);
1288 if (ls->ls_fld != NULL)
1289 rc = fld_server_init(ls->ls_fld, m->mdt_bottom);
1296 static int mdt_fld_fini(struct mdt_device *m)
1298 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1301 if (ls && ls->ls_fld) {
1302 fld_server_fini(ls->ls_fld);
1303 OBD_FREE_PTR(ls->ls_fld);
1308 /* device init/fini methods */
1310 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1312 if (m->mdt_service != NULL) {
1313 ptlrpc_unregister_service(m->mdt_service);
1314 m->mdt_service = NULL;
1318 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1321 struct ptlrpc_service_conf conf = {
1322 .psc_nbufs = MDS_NBUFS,
1323 .psc_bufsize = MDS_BUFSIZE,
1324 .psc_max_req_size = MDS_MAXREQSIZE,
1325 .psc_max_reply_size = MDS_MAXREPSIZE,
1326 .psc_req_portal = MDS_REQUEST_PORTAL,
1327 .psc_rep_portal = MDC_REPLY_PORTAL,
1328 .psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT,
1330 * We'd like to have a mechanism to set this on a per-device
1331 * basis, but alas...
1333 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
1340 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1341 "mdt_ldlm_client", &m->mdt_ldlm_client);
1344 ptlrpc_init_svc_conf(&conf, mdt_handle, LUSTRE_MDT0_NAME,
1345 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1347 if (m->mdt_service == NULL)
1350 rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1352 GOTO(err_mdt_svc, rc);
1356 ptlrpc_unregister_service(m->mdt_service);
1357 m->mdt_service = NULL;
1362 static void mdt_stack_fini(struct mdt_device *m, struct lu_device *d)
1364 /* goes through all stack */
1366 struct lu_device *n;
1367 struct obd_type *type;
1368 struct lu_device_type *ldt = d->ld_type;
1372 /* each fini() returns next device in stack of layers
1373 * * so we can avoid the recursion */
1374 n = ldt->ldt_ops->ldto_device_fini(d);
1375 ldt->ldt_ops->ldto_device_free(d);
1377 type = ldt->ldt_obd_type;
1379 class_put_type(type);
1380 /* switch to the next device in the layer */
1383 m->mdt_child = NULL;
1386 static struct lu_device *mdt_layer_setup(const char *typename,
1387 struct lu_device *child,
1388 struct lustre_cfg *cfg)
1390 struct obd_type *type;
1391 struct lu_device_type *ldt;
1392 struct lu_device *d;
1396 type = class_get_type(typename);
1398 CERROR("Unknown type: '%s'\n", typename);
1399 GOTO(out, rc = -ENODEV);
1403 ldt->ldt_obd_type = type;
1405 CERROR("type: '%s'\n", typename);
1406 GOTO(out_type, rc = -EINVAL);
1409 d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
1411 CERROR("Cannot allocate device: '%s'\n", typename);
1412 GOTO(out_type, rc = -ENODEV);
1415 LASSERT(child->ld_site);
1416 d->ld_site = child->ld_site;
1419 rc = ldt->ldt_ops->ldto_device_init(d, child);
1421 CERROR("can't init device '%s', rc %d\n", typename, rc);
1422 GOTO(out_alloc, rc);
1428 ldt->ldt_ops->ldto_device_free(d);
1431 class_put_type(type);
1433 RETURN(ERR_PTR(rc));
1436 static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
1438 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1439 struct lu_device *tmp;
1442 /* init the stack */
1443 tmp = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
1445 RETURN (PTR_ERR(tmp));
1447 m->mdt_bottom = lu2dt_dev(tmp);
1449 tmp = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
1451 GOTO(out, rc = PTR_ERR(tmp));
1454 tmp = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
1456 GOTO(out, rc = PTR_ERR(tmp));
1459 m->mdt_child = lu2md_dev(d);
1461 /* process setup config */
1462 tmp = &m->mdt_md_dev.md_lu_dev;
1463 rc = tmp->ld_ops->ldo_process_config(tmp, cfg);
1466 /* fini from last known good lu_device */
1468 mdt_stack_fini(m, d);
1473 static void mdt_fini(struct mdt_device *m)
1475 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1479 mdt_stop_ptlrpc_service(m);
1481 /* finish the stack */
1482 mdt_stack_fini(m, md2lu_dev(m->mdt_child));
1484 if (d->ld_site != NULL) {
1485 lu_site_fini(d->ld_site);
1486 OBD_FREE_PTR(d->ld_site);
1489 if (m->mdt_namespace != NULL) {
1490 ldlm_namespace_free(m->mdt_namespace, 0);
1491 m->mdt_namespace = NULL;
1494 if (m->mdt_seq_mgr) {
1495 seq_mgr_fini(m->mdt_seq_mgr);
1496 m->mdt_seq_mgr = NULL;
1499 LASSERT(atomic_read(&d->ld_ref) == 0);
1500 md_device_fini(&m->mdt_md_dev);
1504 static int mdt_init0(struct mdt_device *m,
1505 struct lu_device_type *t, struct lustre_cfg *cfg)
1510 struct lu_context ctx;
1511 const char *dev = lustre_cfg_string(cfg, 0);
1512 struct obd_device *obd;
1515 obd = class_name2obd(dev);
1516 m->mdt_md_dev.md_lu_dev.ld_obd = obd;
1522 md_device_init(&m->mdt_md_dev, t);
1523 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1525 rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1527 CERROR("can't init lu_site, rc %d\n", rc);
1528 GOTO(err_fini_site, rc);
1531 /* init the stack */
1532 rc = mdt_stack_init(m, cfg);
1534 CERROR("can't init device stack, rc %d\n", rc);
1535 GOTO(err_fini_site, rc);
1538 m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
1539 if (!m->mdt_seq_mgr) {
1540 CERROR("can't initialize sequence manager\n");
1541 GOTO(err_fini_stack, rc);
1544 rc = lu_context_init(&ctx);
1546 GOTO(err_fini_mgr, rc);
1548 lu_context_enter(&ctx);
1549 /* init sequence info after device stack is initialized. */
1550 rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
1551 lu_context_exit(&ctx);
1553 GOTO(err_fini_ctx, rc);
1555 lu_context_fini(&ctx);
1557 snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1558 m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1559 if (m->mdt_namespace == NULL)
1560 GOTO(err_fini_site, rc = -ENOMEM);
1562 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1564 rc = mdt_fld_init(m);
1566 GOTO(err_free_ns, rc);
1568 rc = mdt_start_ptlrpc_service(m);
1570 GOTO(err_free_fld, rc);
1577 ldlm_namespace_free(m->mdt_namespace, 0);
1578 m->mdt_namespace = NULL;
1580 lu_context_fini(&ctx);
1582 seq_mgr_fini(m->mdt_seq_mgr);
1583 m->mdt_seq_mgr = NULL;
1585 mdt_stack_fini(m, md2lu_dev(m->mdt_child));
1592 /* used by MGS to process specific configurations */
1593 static int mdt_process_config(struct lu_device *d, struct lustre_cfg *cfg)
1595 struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
1599 switch (cfg->lcfg_command) {
1600 /* all MDT specific commands should be here */
1602 /* others are passed further */
1603 err = next->ld_ops->ldo_process_config(next, cfg);
1608 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
1609 struct lu_device *d)
1611 struct mdt_object *mo;
1615 struct lu_object *o;
1616 struct lu_object_header *h;
1618 o = &mo->mot_obj.mo_lu;
1619 h = &mo->mot_header;
1620 lu_object_header_init(h);
1621 lu_object_init(o, h, d);
1622 lu_object_add_top(h, o);
1623 o->lo_ops = &mdt_obj_ops;
1629 static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
1631 struct mdt_device *d = mdt_dev(o->lo_dev);
1632 struct lu_device *under;
1633 struct lu_object *below;
1635 under = &d->mdt_child->md_lu_dev;
1636 below = under->ld_ops->ldo_object_alloc(ctxt, under);
1637 if (below != NULL) {
1638 lu_object_add(o, below);
1644 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
1646 struct mdt_object *mo = mdt_obj(o);
1647 struct lu_object_header *h;
1651 lu_object_header_fini(h);
1655 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
1659 static int mdt_object_exists(struct lu_context *ctx, struct lu_object *o)
1661 struct lu_object *next = lu_object_next(o);
1663 return next->lo_ops->loo_object_exists(ctx, next);
1666 static int mdt_object_print(struct lu_context *ctxt,
1667 struct seq_file *f, const struct lu_object *o)
1669 return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1672 static struct lu_device_operations mdt_lu_ops = {
1673 .ldo_object_alloc = mdt_object_alloc,
1674 .ldo_object_free = mdt_object_free,
1675 .ldo_process_config = mdt_process_config
1678 static struct lu_object_operations mdt_obj_ops = {
1679 .loo_object_init = mdt_object_init,
1680 .loo_object_release = mdt_object_release,
1681 .loo_object_print = mdt_object_print,
1682 .loo_object_exists = mdt_object_exists
1685 /* mds_connect_internal */
1686 static int mdt_connect0(struct mdt_device *mdt,
1687 struct obd_export *exp, struct obd_connect_data *data)
1690 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
1691 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
1693 /* If no known bits (which should not happen, probably,
1694 as everybody should support LOOKUP and UPDATE bits at least)
1695 revert to compat mode with plain locks. */
1696 if (!data->ocd_ibits_known &&
1697 data->ocd_connect_flags & OBD_CONNECT_IBITS)
1698 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
1700 if (!mdt->mdt_opts.mo_acl)
1701 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
1703 if (!mdt->mdt_opts.mo_user_xattr)
1704 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
1706 exp->exp_connect_flags = data->ocd_connect_flags;
1707 data->ocd_version = LUSTRE_VERSION_CODE;
1708 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
1711 if (mdt->mdt_opts.mo_acl &&
1712 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
1713 CWARN("%s: MDS requires ACL support but client does not\n",
1714 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
1720 /* mds_connect copy */
1721 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1722 struct obd_uuid *cluuid,
1723 struct obd_connect_data *data)
1725 struct obd_export *exp;
1727 struct mdt_device *mdt;
1728 struct mds_export_data *med;
1729 struct mds_client_data *mcd = NULL;
1732 if (!conn || !obd || !cluuid)
1735 mdt = mdt_dev(obd->obd_lu_dev);
1737 rc = class_connect(conn, obd, cluuid);
1741 exp = class_conn2export(conn);
1742 LASSERT(exp != NULL);
1743 med = &exp->exp_mds_data;
1745 rc = mdt_connect0(mdt, exp, data);
1749 memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
1755 class_disconnect(exp);
1757 class_export_put(exp);
1762 static int mdt_obd_disconnect(struct obd_export *exp)
1764 struct mds_export_data *med = &exp->exp_mds_data;
1765 unsigned long irqflags;
1770 class_export_get(exp);
1772 /* Disconnect early so that clients can't keep using export */
1773 rc = class_disconnect(exp);
1774 //ldlm_cancel_locks_for_export(exp);
1776 /* complete all outstanding replies */
1777 spin_lock_irqsave(&exp->exp_lock, irqflags);
1778 while (!list_empty(&exp->exp_outstanding_replies)) {
1779 struct ptlrpc_reply_state *rs =
1780 list_entry(exp->exp_outstanding_replies.next,
1781 struct ptlrpc_reply_state, rs_exp_list);
1782 struct ptlrpc_service *svc = rs->rs_service;
1784 spin_lock(&svc->srv_lock);
1785 list_del_init(&rs->rs_exp_list);
1786 ptlrpc_schedule_difficult_reply(rs);
1787 spin_unlock(&svc->srv_lock);
1789 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1791 OBD_FREE_PTR(med->med_mcd);
1793 class_export_put(exp);
1797 static struct obd_ops mdt_obd_device_ops = {
1798 .o_owner = THIS_MODULE,
1799 .o_connect = mdt_obd_connect,
1800 .o_disconnect = mdt_obd_disconnect,
1803 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1804 struct lustre_cfg *cfg)
1806 struct lu_device *l;
1807 struct mdt_device *m;
1813 l = &m->mdt_md_dev.md_lu_dev;
1814 result = mdt_init0(m, t, cfg);
1817 return ERR_PTR(result);
1821 l = ERR_PTR(-ENOMEM);
1825 static void mdt_device_free(struct lu_device *d)
1827 struct mdt_device *m = mdt_dev(d);
1833 static void *mdt_thread_init(struct lu_context *ctx)
1835 struct mdt_thread_info *info;
1837 OBD_ALLOC_PTR(info);
1839 info->mti_ctxt = ctx;
1841 info = ERR_PTR(-ENOMEM);
1845 static void mdt_thread_fini(struct lu_context *ctx, void *data)
1847 struct mdt_thread_info *info = data;
1851 static struct lu_context_key mdt_thread_key = {
1852 .lct_init = mdt_thread_init,
1853 .lct_fini = mdt_thread_fini
1856 static int mdt_type_init(struct lu_device_type *t)
1858 return lu_context_key_register(&mdt_thread_key);
1861 static void mdt_type_fini(struct lu_device_type *t)
1863 lu_context_key_degister(&mdt_thread_key);
1866 static struct lu_device_type_operations mdt_device_type_ops = {
1867 .ldto_init = mdt_type_init,
1868 .ldto_fini = mdt_type_fini,
1870 .ldto_device_alloc = mdt_device_alloc,
1871 .ldto_device_free = mdt_device_free
1874 static struct lu_device_type mdt_device_type = {
1875 .ldt_tags = LU_DEVICE_MD,
1876 .ldt_name = LUSTRE_MDT0_NAME,
1877 .ldt_ops = &mdt_device_type_ops
1880 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1884 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1888 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1890 static int __init mdt_mod_init(void)
1892 struct lprocfs_static_vars lvars;
1894 mdt_num_threads = MDT_NUM_THREADS;
1895 lprocfs_init_vars(mdt, &lvars);
1896 return class_register_type(&mdt_obd_device_ops, NULL,
1897 lvars.module_vars, LUSTRE_MDT0_NAME,
1901 static void __exit mdt_mod_exit(void)
1903 class_unregister_type(LUSTRE_MDT0_NAME);
1907 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn) \
1908 [prefix ## _ ## opc - prefix ## _ ## base] = { \
1910 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
1911 .mh_opc = prefix ## _ ## opc, \
1912 .mh_flags = flags, \
1916 #define DEF_MDT_HNDL(flags, name, fn) \
1917 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1919 static struct mdt_handler mdt_mds_ops[] = {
1920 DEF_MDT_HNDL(0, CONNECT, mdt_connect),
1921 DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect),
1922 DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus),
1923 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr),
1924 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name),
1925 DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr),
1926 DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr),
1927 DEF_MDT_HNDL(0, STATFS, mdt_statfs),
1928 DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage),
1929 DEF_MDT_HNDL(0, REINT, mdt_reint),
1930 DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close),
1931 DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
1932 DEF_MDT_HNDL(0, PIN, mdt_pin),
1933 DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
1934 DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
1935 DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
1938 static struct mdt_handler mdt_obd_ops[] = {
1941 #define DEF_DLM_HNDL(flags, name, fn) \
1942 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1944 static struct mdt_handler mdt_dlm_ops[] = {
1945 DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
1946 DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT, mdt_convert),
1947 DEF_DLM_HNDL(0, BL_CALLBACK, mdt_bl_callback),
1948 DEF_DLM_HNDL(0, CP_CALLBACK, mdt_cp_callback)
1951 static struct mdt_handler mdt_llog_ops[] = {
1954 static struct mdt_opc_slice mdt_handlers[] = {
1956 .mos_opc_start = MDS_GETATTR,
1957 .mos_opc_end = MDS_LAST_OPC,
1958 .mos_hs = mdt_mds_ops
1961 .mos_opc_start = OBD_PING,
1962 .mos_opc_end = OBD_LAST_OPC,
1963 .mos_hs = mdt_obd_ops
1966 .mos_opc_start = LDLM_ENQUEUE,
1967 .mos_opc_end = LDLM_LAST_OPC,
1968 .mos_hs = mdt_dlm_ops
1971 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1972 .mos_opc_end = LLOG_LAST_OPC,
1973 .mos_hs = mdt_llog_ops
1980 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1981 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1982 MODULE_LICENSE("GPL");
1984 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1985 "number of mdt service threads to start");
1987 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);