1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
14 * This file is part of the Lustre file system, http://www.lustre.org
15 * Lustre is a trademark of Cluster File Systems, Inc.
17 * You may have signed or agreed to another license before downloading
18 * this software. If so, you are bound by the terms and conditions
19 * of that agreement, and the following does not apply to you. See the
20 * LICENSE file included with this distribution for more information.
22 * If you did not agree to a different license, then this copy of Lustre
23 * is open source software; you can redistribute it and/or modify it
24 * under the terms of version 2 of the GNU General Public License as
25 * published by the Free Software Foundation.
27 * In either case, Lustre is distributed in the hope that it will be
28 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * license text for more details.
34 # define EXPORT_SYMTAB
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <linux/module.h>
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
43 * struct OBD_{ALLOC,FREE}*()
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
54 /* struct mds_client_data */
55 #include "../mds/mds_internal.h"
56 #include "mdt_internal.h"
59 * Initialized in mdt_mod_init().
61 unsigned long mdt_num_threads;
63 static int mdt_handle(struct ptlrpc_request *req);
64 static struct ptlrpc_thread_key mdt_thread_key;
66 /* object operations */
68 static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
69 struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
72 struct mdt_object *child;
73 struct mdt_lock_handle *lh;
77 lh = &info->mti_lh[MDT_LH_PARENT];
78 lh->mlh_mode = LCK_PW;
80 o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
84 child = mdt_object_find(d, cfid);
86 struct md_object *next = mdt_object_child(o);
88 result = next->mo_ops->moo_mkdir(&info->mti_ctxt, next, name,
89 mdt_object_child(child));
90 mdt_object_put(child);
92 result = PTR_ERR(child);
93 mdt_object_unlock(d->mdt_namespace, o, lh);
98 static int mdt_md_getattr(struct mdt_thread_info *info, struct lu_fid *fid)
100 struct mdt_device *d = info->mti_mdt;
101 struct mdt_object *o;
106 o = mdt_object_find(&info->mti_ctxt, d, fid);
109 /* attr are in mti_ctxt */
111 mdt_object_put(&info->mti_ctxt, o);
116 static int mdt_getstatus(struct mdt_thread_info *info,
117 struct ptlrpc_request *req, int offset)
119 struct md_device *next = info->mti_mdt->mdt_child;
120 struct mdt_body *body;
121 int size = sizeof *body;
126 result = lustre_pack_reply(req, 1, &size, NULL);
128 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
130 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
133 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
134 result = next->md_ops->mdo_root_get(&info->mti_ctxt,
138 /* the last_committed and last_xid fields are filled in for all
139 * replies already - no need to do so here also.
144 static int mdt_statfs(struct mdt_thread_info *info,
145 struct ptlrpc_request *req, int offset)
147 struct md_device *next = info->mti_mdt->mdt_child;
148 struct obd_statfs *osfs;
151 int size = sizeof(struct obd_statfs);
155 result = lustre_pack_reply(req, 1, &size, NULL);
157 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
159 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
160 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
163 osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
164 /* XXX max_age optimisation is needed here. See mds_statfs */
165 result = next->md_ops->mdo_statfs(&info->mti_ctxt, next, &sfs);
166 statfs_pack(osfs, &sfs);
172 static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
174 b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
175 OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
176 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
178 if (!S_ISREG(attr->la_mode))
179 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
182 b->atime = attr->la_atime;
183 b->mtime = attr->la_mtime;
184 b->ctime = attr->la_ctime;
185 b->mode = attr->la_mode;
186 b->size = attr->la_size;
187 b->blocks = attr->la_blocks;
188 b->uid = attr->la_uid;
189 b->gid = attr->la_gid;
190 b->flags = attr->la_flags;
191 b->nlink = attr->la_nlink;
194 static int mdt_getattr(struct mdt_thread_info *info,
195 struct ptlrpc_request *req, int offset)
197 struct mdt_body *body;
198 int size = sizeof (*body);
199 struct lu_attr *attr;
208 result = lustre_pack_reply(req, 1, &size, NULL);
210 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
212 else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
213 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
216 body = lustre_msg_buf(req->rq_repmsg, 0, size);
217 result = mdt_md_getattr(info, &body->fid1);
219 mdt_pack_attr2body(body, &info->mti_ctxt.lc_attr);
225 static int mdt_set_info(struct mdt_thread_info *info,
226 struct ptlrpc_request *req, int offset)
228 struct md_device *next = info->mti_mdt->mdt_child;
233 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
235 DEBUG_REQ(D_HA, req, "no set_info key");
238 keylen = req->rq_reqmsg->buflens[0];
240 if (((keylen >= strlen("fld_create") &&
241 memcmp(key, "fld_create", keylen) == 0)) ||
242 ((keylen >= strlen("fld_delete") &&
243 memcmp(key, "fld_delete", keylen) == 0))) {
244 struct md_fld mf, *p;
245 __u32 size = sizeof(struct md_fld);
247 rc = lustre_pack_reply(req, 0, NULL, NULL);
251 p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
253 rc = next->md_ops->mdo_get_info(&info->mti_ctxt, next, keylen,
258 CDEBUG(D_IOCTL, "invalid key\n");
263 static int mdt_get_info(struct mdt_thread_info *info,
264 struct ptlrpc_request *req, int offset)
266 struct md_device *next = info->mti_mdt->mdt_child;
271 key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
273 DEBUG_REQ(D_HA, req, "no set_info key");
276 keylen = req->rq_reqmsg->buflens[0];
278 if (((keylen >= strlen("fld_get") &&
279 memcmp(key, "fld_get", keylen) == 0))) {
280 struct md_fld mf, *p, *reply;
281 int size = sizeof(*reply);
283 rc = lustre_pack_reply(req, 1, &size, NULL);
286 p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
288 rc = next->md_ops->mdo_get_info(&info->mti_ctxt, next, keylen,
290 reply = lustre_msg_buf(req->rq_repmsg, 0, size);
295 CDEBUG(D_IOCTL, "invalid key\n");
299 static struct lu_device_operations mdt_lu_ops;
301 static int lu_device_is_mdt(struct lu_device *d)
304 * XXX for now. Tags in lu_device_type->ldt_something are needed.
306 return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
309 static struct mdt_device *mdt_dev(struct lu_device *d)
311 LASSERT(lu_device_is_mdt(d));
312 return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
315 static int mdt_connect(struct mdt_thread_info *info,
316 struct ptlrpc_request *req, int offset)
320 result = target_handle_connect(req, mdt_handle);
322 struct mdt_device *mdt = info->mti_mdt;
323 struct obd_connect_data *data;
325 data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
326 result = seq_mgr_alloc(&info->mti_ctxt,
327 mdt->mdt_seq_mgr, &data->ocd_seq);
332 static int mdt_disconnect(struct mdt_thread_info *info,
333 struct ptlrpc_request *req, int offset)
338 static int mdt_getattr_name(struct mdt_thread_info *info,
339 struct ptlrpc_request *req, int offset)
344 static int mdt_setxattr(struct mdt_thread_info *info,
345 struct ptlrpc_request *req, int offset)
350 static int mdt_getxattr(struct mdt_thread_info *info,
351 struct ptlrpc_request *req, int offset)
356 static int mdt_readpage(struct mdt_thread_info *info,
357 struct ptlrpc_request *req, int offset)
362 static int mdt_reint(struct mdt_thread_info *info,
363 struct ptlrpc_request *req, int offset)
368 static int mdt_close(struct mdt_thread_info *info,
369 struct ptlrpc_request *req, int offset)
374 static int mdt_done_writing(struct mdt_thread_info *info,
375 struct ptlrpc_request *req, int offset)
380 static int mdt_pin(struct mdt_thread_info *info,
381 struct ptlrpc_request *req, int offset)
386 static int mdt_sync(struct mdt_thread_info *info,
387 struct ptlrpc_request *req, int offset)
392 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
393 struct ptlrpc_request *req, int offset)
398 static int mdt_handle_quotactl(struct mdt_thread_info *info,
399 struct ptlrpc_request *req, int offset)
408 static struct ldlm_callback_suite cbs = {
409 .lcs_completion = ldlm_server_completion_ast,
410 .lcs_blocking = ldlm_server_blocking_ast,
414 static int mdt_enqueue(struct mdt_thread_info *info,
415 struct ptlrpc_request *req, int offset)
418 * info->mti_dlm_req already contains swapped and (if necessary)
419 * converted dlm request.
421 LASSERT(info->mti_dlm_req);
423 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
424 return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
427 static int mdt_convert(struct mdt_thread_info *info,
428 struct ptlrpc_request *req, int offset)
430 LASSERT(info->mti_dlm_req);
431 return ldlm_handle_convert0(req, info->mti_dlm_req);
434 static int mdt_bl_callback(struct mdt_thread_info *info,
435 struct ptlrpc_request *req, int offset)
437 CERROR("bl callbacks should not happen on MDS\n");
442 static int mdt_cp_callback(struct mdt_thread_info *info,
443 struct ptlrpc_request *req, int offset)
445 CERROR("cp callbacks should not happen on MDS\n");
451 * Build (DLM) resource name from fid.
453 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
454 struct ldlm_res_id *name)
456 memset(name, 0, sizeof *name);
457 /* we use fid_num() whoch includes also object version instread of raw
459 name->name[0] = fid_seq(f);
460 name->name[1] = fid_num(f);
465 * Return true if resource is for object identified by fid.
467 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
469 return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
472 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
473 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
474 struct lustre_handle *lh, ldlm_mode_t mode,
475 ldlm_policy_data_t *policy)
477 struct ldlm_res_id res_id;
485 /* FIXME: is that correct to have @flags=0 here? */
486 rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
487 LDLM_IBITS, policy, mode, &flags,
488 ldlm_blocking_ast, ldlm_completion_ast, NULL,
489 NULL, NULL, 0, NULL, lh);
490 RETURN (rc == ELDLM_OK ? 0 : -EIO);
493 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
494 struct lustre_handle *lh, ldlm_mode_t mode)
496 struct ldlm_lock *lock;
499 /* FIXME: this is debug stuff, remove it later. */
500 lock = ldlm_handle2lock(lh);
502 CERROR("invalid lock handle "LPX64, lh->cookie);
506 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
508 ldlm_lock_decref(lh, mode);
512 static struct mdt_object *mdt_obj(struct lu_object *o)
514 LASSERT(lu_device_is_mdt(o->lo_dev));
515 return container_of(o, struct mdt_object, mot_obj.mo_lu);
518 struct mdt_object *mdt_object_find(struct lu_context *ctxt,
519 struct mdt_device *d,
524 o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
526 return (struct mdt_object *)o;
531 void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
533 lu_object_put(ctxt, &o->mot_obj.mo_lu);
536 struct lu_fid *mdt_object_fid(struct mdt_object *o)
538 return lu_object_fid(&o->mot_obj.mo_lu);
541 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
542 struct mdt_lock_handle *lh, __u64 ibits)
544 ldlm_policy_data_t p = {
549 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
550 LASSERT(lh->mlh_mode != LCK_MINMODE);
552 return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
555 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
556 struct mdt_lock_handle *lh)
558 if (lustre_handle_is_used(&lh->mlh_lh)) {
559 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
560 lh->mlh_lh.cookie = 0;
564 struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
565 struct mdt_device *d,
567 struct mdt_lock_handle *lh,
570 struct mdt_object *o;
572 o = mdt_object_find(ctxt, d, f);
576 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
578 mdt_object_put(ctxt, o);
590 int (*mh_act)(struct mdt_thread_info *info,
591 struct ptlrpc_request *req, int offset);
594 enum mdt_handler_flags {
596 * struct mdt_body is passed in the 0-th incoming buffer.
598 HABEO_CORPUS = (1 << 0),
600 * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
603 HABEO_CLAVIS = (1 << 1)
606 struct mdt_opc_slice {
609 struct mdt_handler *mos_hs;
612 static struct mdt_opc_slice mdt_handlers[];
614 static struct mdt_handler *mdt_handler_find(__u32 opc)
616 struct mdt_opc_slice *s;
617 struct mdt_handler *h;
620 for (s = mdt_handlers; s->mos_hs != NULL; s++) {
621 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
622 h = s->mos_hs + (opc - s->mos_opc_start);
624 LASSERT(h->mh_opc == opc);
626 h = NULL; /* unsupported opc */
633 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
635 return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
638 static int mdt_lock_resname_compat(struct mdt_device *m,
639 struct ldlm_request *req)
641 /* XXX something... later. */
645 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
647 /* XXX something... later. */
652 * Invoke handler for this request opc. Also do necessary preprocessing
653 * (according to handler ->mh_flags), and post-processing (setting of
654 * ->last_{xid,committed}).
656 static int mdt_req_handle(struct mdt_thread_info *info,
657 struct mdt_handler *h, struct ptlrpc_request *req,
666 LASSERT(h->mh_act != NULL);
667 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
668 LASSERT(current->journal_info == NULL);
670 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
672 if (h->mh_fail_id != 0)
673 OBD_FAIL_RETURN(h->mh_fail_id, 0);
675 off = MDS_REQ_REC_OFF + shift;
677 h->mh_flags & HABEO_CLAVIS &&
678 info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME;
681 if (h->mh_flags & HABEO_CORPUS) {
682 struct mdt_body *body;
684 body = info->mti_body =
685 lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
686 lustre_swab_mdt_body);
688 info->mti_object = mdt_object_find(&info->mti_ctxt,
691 if (IS_ERR(info->mti_object))
692 result = PTR_ERR(info->mti_object);
694 CERROR("Can't unpack body\n");
697 } else if (lock_conv) {
698 struct ldlm_request *dlm;
701 dlm = info->mti_dlm_req =
702 lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
704 lustre_swab_ldlm_request);
706 result = mdt_lock_resname_compat(info->mti_mdt, dlm);
708 CERROR("Can't unpack dlm request\n");
716 result = h->mh_act(info, req, off);
718 * XXX result value is unconditionally shoved into ->rq_status
719 * (original code sometimes placed error code into ->rq_status, and
720 * sometimes returned it to the
721 * caller). ptlrpc_server_handle_request() doesn't check return value
724 req->rq_status = result;
726 LASSERT(current->journal_info == NULL);
729 struct ldlm_reply *rep;
731 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
733 result = mdt_lock_reply_compat(info->mti_mdt, rep);
736 /* If we're DISCONNECTing, the mds_export_data is already freed */
737 if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
738 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
739 target_committed_to_req(req);
744 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
746 lh->mlh_lh.cookie = 0ull;
747 lh->mlh_mode = LCK_MINMODE;
750 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
752 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
755 static void mdt_thread_info_init(struct mdt_thread_info *info)
759 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
763 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
764 info->mti_rep_buf_size[i] = ~0;
765 info->mti_rep_buf_nr = i;
766 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
767 mdt_lock_handle_init(&info->mti_lh[i]);
768 lu_context_enter(&info->mti_ctxt);
771 static void mdt_thread_info_fini(struct mdt_thread_info *info)
775 lu_context_exit(&info->mti_ctxt);
776 if (info->mti_object != NULL) {
777 mdt_object_put(&info->mti_ctxt, info->mti_object);
778 info->mti_object = NULL;
780 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
781 mdt_lock_handle_fini(&info->mti_lh[i]);
784 static int mds_msg_check_version(struct lustre_msg *msg)
788 /* TODO: enable the below check while really introducing msg version.
789 * it's disabled because it will break compatibility with b1_4.
797 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
799 CERROR("bad opc %u version %08x, expecting %08x\n",
800 msg->opc, msg->version, LUSTRE_OBD_VERSION);
804 case MDS_GETATTR_NAME:
809 case MDS_DONE_WRITING:
819 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
821 CERROR("bad opc %u version %08x, expecting %08x\n",
822 msg->opc, msg->version, LUSTRE_MDS_VERSION);
826 case LDLM_BL_CALLBACK:
827 case LDLM_CP_CALLBACK:
828 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
830 CERROR("bad opc %u version %08x, expecting %08x\n",
831 msg->opc, msg->version, LUSTRE_DLM_VERSION);
834 case LLOG_ORIGIN_HANDLE_CREATE:
835 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
836 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
837 case LLOG_ORIGIN_HANDLE_READ_HEADER:
838 case LLOG_ORIGIN_HANDLE_CLOSE:
840 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
842 CERROR("bad opc %u version %08x, expecting %08x\n",
843 msg->opc, msg->version, LUSTRE_LOG_VERSION);
846 CERROR("MDS unknown opcode %d\n", msg->opc);
852 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
853 struct obd_device *obd, int *process)
855 switch (req->rq_reqmsg->opc) {
856 case MDS_CONNECT: /* This will never get here, but for completeness. */
857 case OST_CONNECT: /* This will never get here, but for completeness. */
864 case MDS_SYNC: /* used in unmounting */
868 *process = target_queue_recovery_request(req, obd);
872 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
874 /* XXX what should we set rq_status to here? */
875 req->rq_status = -EAGAIN;
876 RETURN(ptlrpc_error(req));
881 * Handle recovery. Return:
882 * +1: continue request processing;
883 * -ve: abort immediately with the given error code;
884 * 0: send reply with error code in req->rq_status;
886 static int mdt_recovery(struct ptlrpc_request *req)
890 struct obd_device *obd;
894 if (req->rq_reqmsg->opc == MDS_CONNECT)
897 if (req->rq_export == NULL) {
898 CERROR("operation %d on unconnected MDS from %s\n",
900 libcfs_id2str(req->rq_peer));
901 req->rq_status = -ENOTCONN;
905 /* sanity check: if the xid matches, the request must be marked as a
906 * resent or replayed */
907 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
908 lustre_msg_get_flags(req->rq_reqmsg) &
909 (MSG_RESENT | MSG_REPLAY)),
910 "rq_xid "LPU64" matches last_xid, "
911 "expected RESENT flag\n", req->rq_xid);
913 /* else: note the opposite is not always true; a RESENT req after a
914 * failover will usually not match the last_xid, since it was likely
915 * never committed. A REPLAYed request will almost never match the
916 * last xid, however it could for a committed, but still retained,
919 obd = req->rq_export->exp_obd;
921 /* Check for aborted recovery... */
922 spin_lock_bh(&obd->obd_processing_task_lock);
923 abort_recovery = obd->obd_abort_recovery;
924 recovering = obd->obd_recovering;
925 spin_unlock_bh(&obd->obd_processing_task_lock);
926 if (abort_recovery) {
927 target_abort_recovery(obd);
928 } else if (recovering) {
932 rc = mdt_filter_recovery_request(req, obd, &should_process);
933 if (rc != 0 || !should_process) {
941 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
943 struct obd_device *obd;
945 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
946 if (req->rq_reqmsg->opc != OBD_PING)
947 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
949 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
950 if (obd && obd->obd_recovering) {
951 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
952 RETURN(target_queue_final_reply(req, req->rq_status));
954 /* Lost a race with recovery; let the error path
956 req->rq_status = -ENOTCONN;
959 target_send_reply(req, req->rq_status, info->mti_fail_id);
960 RETURN(req->rq_status);
963 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
965 struct mdt_handler *h;
966 struct lustre_msg *msg;
971 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
973 LASSERT(current->journal_info == NULL);
975 msg = req->rq_reqmsg;
976 result = mds_msg_check_version(msg);
978 result = mdt_recovery(req);
981 h = mdt_handler_find(msg->opc);
983 result = mdt_req_handle(info, h, req, 0);
985 req->rq_status = -ENOTSUPP;
986 result = ptlrpc_error(req);
991 result = mdt_reply(req, info);
994 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
998 static int mdt_handle(struct ptlrpc_request *req)
1002 struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
1006 mdt_thread_info_init(info);
1007 /* it can be NULL while CONNECT */
1009 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1010 info->mti_ctxt.lc_thread = req->rq_svc_thread;
1012 result = mdt_handle0(req, info);
1013 mdt_thread_info_fini(info);
1017 static int mdt_intent_policy(struct ldlm_namespace *ns,
1018 struct ldlm_lock **lockp, void *req_cookie,
1019 ldlm_mode_t mode, int flags, void *data)
1022 RETURN(ELDLM_LOCK_ABORTED);
1025 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
1026 svc_handler_t h, char *name,
1027 struct proc_dir_entry *proc_entry,
1028 svcreq_printfn_t prntfn)
1030 return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
1031 c->psc_max_req_size, c->psc_max_reply_size,
1032 c->psc_req_portal, c->psc_rep_portal,
1033 c->psc_watchdog_timeout,
1034 h, name, proc_entry,
1035 prntfn, c->psc_num_threads);
1038 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
1039 const char *name, void *buf, int size, int mode)
1041 struct md_device *child = m->mdt_child;
1043 RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
1046 static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
1049 struct mdt_device *m = opaque;
1053 rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
1059 static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
1062 RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
1065 static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
1068 RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
1071 struct lu_seq_mgr_ops seq_mgr_ops = {
1072 .smo_read = mdt_seq_mgr_read,
1073 .smo_write = mdt_seq_mgr_write
1076 static void mdt_fini(struct mdt_device *m)
1078 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1080 if (d->ld_site != NULL) {
1081 lu_site_fini(d->ld_site);
1082 OBD_FREE_PTR(d->ld_site);
1085 if (m->mdt_service != NULL) {
1086 ptlrpc_unregister_service(m->mdt_service);
1087 m->mdt_service = NULL;
1089 if (m->mdt_namespace != NULL) {
1090 ldlm_namespace_free(m->mdt_namespace, 0);
1091 m->mdt_namespace = NULL;
1093 /* finish the stack */
1095 struct lu_device *child = md2lu_dev(m->mdt_child);
1096 child->ld_type->ldt_ops->ldto_device_fini(child);
1099 if (m->mdt_seq_mgr) {
1100 seq_mgr_fini(m->mdt_seq_mgr);
1101 m->mdt_seq_mgr = NULL;
1104 LASSERT(atomic_read(&d->ld_ref) == 0);
1105 md_device_fini(&m->mdt_md_dev);
1108 static int mdt_init0(struct mdt_device *m,
1109 struct lu_device_type *t, struct lustre_cfg *cfg)
1114 struct obd_device *obd;
1115 struct lu_device *mdt_child;
1116 const char *top = lustre_cfg_string(cfg, 0);
1117 const char *child = lustre_cfg_string(cfg, 1);
1118 struct lu_context ctx;
1122 /* get next layer */
1123 obd = class_name2obd((char *)child);
1124 if (obd && obd->obd_lu_dev) {
1125 CDEBUG(D_INFO, "Child device is %s\n", child);
1126 m->mdt_child = lu2md_dev(obd->obd_lu_dev);
1127 mdt_child = md2lu_dev(m->mdt_child);
1129 CDEBUG(D_INFO, "Child device %s is not found\n", child);
1137 md_device_init(&m->mdt_md_dev, t);
1138 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1139 lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1141 m->mdt_service_conf.psc_nbufs = MDS_NBUFS;
1142 m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE;
1143 m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE;
1144 m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE;
1145 m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL;
1146 m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL;
1147 m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1149 * We'd like to have a mechanism to set this on a per-device basis,
1152 m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1155 snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1156 m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1157 if (m->mdt_namespace == NULL)
1158 GOTO(err_fini_site, rc = -ENOMEM);
1160 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1162 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1163 "mdt_ldlm_client", &m->mdt_ldlm_client);
1166 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1168 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1170 if (m->mdt_service == NULL)
1171 GOTO(err_free_ns, rc = -ENOMEM);
1173 /* init the stack */
1174 LASSERT(mdt_child->ld_type->ldt_ops->ldto_device_init != NULL);
1175 rc = mdt_child->ld_type->ldt_ops->ldto_device_init(mdt_child, top);
1177 CERROR("can't init device stack, rc %d\n", rc);
1178 GOTO(err_free_svc, rc);
1181 m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
1182 if (!m->mdt_seq_mgr) {
1183 CERROR("can't initialize sequence manager\n");
1184 GOTO(err_fini_child, rc);
1187 rc = lu_context_init(&ctx);
1189 GOTO(err_fini_mgr, rc);
1191 lu_context_enter(&ctx);
1192 /* init sequence info after device stack is initialized. */
1193 rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
1194 lu_context_exit(&ctx);
1196 GOTO(err_fini_ctx, rc);
1198 rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1200 GOTO(err_fini_ctx, rc);
1202 lu_context_fini(&ctx);
1206 lu_context_fini(&ctx);
1208 seq_mgr_fini(m->mdt_seq_mgr);
1209 m->mdt_seq_mgr = NULL;
1211 mdt_child->ld_type->ldt_ops->ldto_device_fini(mdt_child);
1213 ptlrpc_unregister_service(m->mdt_service);
1214 m->mdt_service = NULL;
1216 ldlm_namespace_free(m->mdt_namespace, 0);
1217 m->mdt_namespace = NULL;
1224 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
1225 struct lu_device *d)
1227 struct mdt_object *mo;
1231 struct lu_object *o;
1232 struct lu_object_header *h;
1234 o = &mo->mot_obj.mo_lu;
1235 h = &mo->mot_header;
1236 lu_object_header_init(h);
1237 lu_object_init(o, h, d);
1238 lu_object_add_top(h, o);
1244 static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
1246 struct mdt_device *d = mdt_dev(o->lo_dev);
1247 struct lu_device *under;
1248 struct lu_object *below;
1250 under = &d->mdt_child->md_lu_dev;
1251 below = under->ld_ops->ldo_object_alloc(ctxt, under);
1252 if (below != NULL) {
1253 lu_object_add(o, below);
1259 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
1261 struct lu_object_header *h;
1265 lu_object_header_fini(h);
1268 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
1272 static int mdt_object_print(struct lu_context *ctxt,
1273 struct seq_file *f, const struct lu_object *o)
1275 return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1278 static struct lu_device_operations mdt_lu_ops = {
1279 .ldo_object_alloc = mdt_object_alloc,
1280 .ldo_object_init = mdt_object_init,
1281 .ldo_object_free = mdt_object_free,
1282 .ldo_object_release = mdt_object_release,
1283 .ldo_object_print = mdt_object_print
1286 /* mds_connect copy */
1287 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1288 struct obd_uuid *cluuid,
1289 struct obd_connect_data *data)
1291 struct obd_export *exp;
1292 int rc, abort_recovery;
1293 struct mdt_device *mdt;
1294 struct mds_export_data *med;
1295 struct mds_client_data *mcd = NULL;
1299 if (!conn || !obd || !cluuid)
1302 mdt = mdt_dev(obd->obd_lu_dev);
1304 /* Check for aborted recovery. */
1305 spin_lock_bh(&obd->obd_processing_task_lock);
1306 abort_recovery = obd->obd_abort_recovery;
1307 spin_unlock_bh(&obd->obd_processing_task_lock);
1309 target_abort_recovery(obd);
1311 /* XXX There is a small race between checking the list and adding a
1312 * new connection for the same UUID, but the real threat (list
1313 * corruption when multiple different clients connect) is solved.
1315 * There is a second race between adding the export to the list,
1316 * and filling in the client data below. Hence skipping the case
1317 * of NULL mcd above. We should already be controlling multiple
1318 * connects at the client, and we can't hold the spinlock over
1319 * memory allocations without risk of deadlocking.
1321 rc = class_connect(conn, obd, cluuid);
1324 exp = class_conn2export(conn);
1326 med = &exp->exp_mds_data;
1328 OBD_ALLOC(mcd, sizeof(*mcd));
1330 GOTO(out, rc = -ENOMEM);
1332 memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
1338 OBD_FREE(mcd, sizeof(*mcd));
1339 med->med_mcd = NULL;
1341 class_disconnect(exp);
1343 class_export_put(exp);
1349 static struct obd_ops mdt_obd_device_ops = {
1350 .o_owner = THIS_MODULE,
1351 .o_connect = mdt_obd_connect
1354 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1355 struct lustre_cfg *cfg)
1357 struct lu_device *l;
1358 struct mdt_device *m;
1364 l = &m->mdt_md_dev.md_lu_dev;
1365 result = mdt_init0(m, t, cfg);
1368 return ERR_PTR(result);
1372 l = ERR_PTR(-ENOMEM);
1376 static void mdt_device_free(struct lu_device *d)
1378 struct mdt_device *m = mdt_dev(d);
1384 static void *mdt_thread_init(struct ptlrpc_thread *t)
1386 struct mdt_thread_info *info;
1389 OBD_ALLOC_PTR(info);
1391 result = lu_context_init(&info->mti_ctxt);
1395 info = ERR_PTR(result);
1399 static void mdt_thread_fini(struct ptlrpc_thread *t, void *data)
1401 struct mdt_thread_info *info = data;
1402 lu_context_fini(&info->mti_ctxt);
1406 static struct ptlrpc_thread_key mdt_thread_key = {
1407 .ptk_init = mdt_thread_init,
1408 .ptk_fini = mdt_thread_fini
1411 static int mdt_type_init(struct lu_device_type *t)
1413 return ptlrpc_thread_key_register(&mdt_thread_key);
1416 static void mdt_type_fini(struct lu_device_type *t)
1420 static struct lu_device_type_operations mdt_device_type_ops = {
1421 .ldto_init = mdt_type_init,
1422 .ldto_fini = mdt_type_fini,
1424 .ldto_device_alloc = mdt_device_alloc,
1425 .ldto_device_free = mdt_device_free
1428 static struct lu_device_type mdt_device_type = {
1429 .ldt_tags = LU_DEVICE_MD,
1430 .ldt_name = LUSTRE_MDT0_NAME,
1431 .ldt_ops = &mdt_device_type_ops
1434 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1438 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1442 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1444 static int __init mdt_mod_init(void)
1446 struct lprocfs_static_vars lvars;
1448 mdt_num_threads = MDT_NUM_THREADS;
1449 lprocfs_init_vars(mdt, &lvars);
1450 return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
1451 LUSTRE_MDT0_NAME, &mdt_device_type);
1454 static void __exit mdt_mod_exit(void)
1456 class_unregister_type(LUSTRE_MDT0_NAME);
1460 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn) \
1461 [prefix ## _ ## opc - prefix ## _ ## base] = { \
1463 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
1464 .mh_opc = prefix ## _ ## opc, \
1465 .mh_flags = flags, \
1469 #define DEF_MDT_HNDL(flags, name, fn) \
1470 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1472 static struct mdt_handler mdt_mds_ops[] = {
1473 DEF_MDT_HNDL(0, CONNECT, mdt_connect),
1474 DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect),
1475 DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus),
1476 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr),
1477 DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name),
1478 DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr),
1479 DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr),
1480 DEF_MDT_HNDL(0, STATFS, mdt_statfs),
1481 DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage),
1482 DEF_MDT_HNDL(0, REINT, mdt_reint),
1483 DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close),
1484 DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
1485 DEF_MDT_HNDL(0, PIN, mdt_pin),
1486 DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync),
1487 DEF_MDT_HNDL(0, SET_INFO, mdt_set_info),
1488 DEF_MDT_HNDL(0, GET_INFO, mdt_get_info),
1489 DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck),
1490 DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl)
1493 static struct mdt_handler mdt_obd_ops[] = {
1496 #define DEF_DLM_HNDL(flags, name, fn) \
1497 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1499 static struct mdt_handler mdt_dlm_ops[] = {
1500 DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
1501 DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT, mdt_convert),
1502 DEF_DLM_HNDL(0, BL_CALLBACK, mdt_bl_callback),
1503 DEF_DLM_HNDL(0, CP_CALLBACK, mdt_cp_callback)
1506 static struct mdt_handler mdt_llog_ops[] = {
1509 static struct mdt_opc_slice mdt_handlers[] = {
1511 .mos_opc_start = MDS_GETATTR,
1512 .mos_opc_end = MDS_LAST_OPC,
1513 .mos_hs = mdt_mds_ops
1516 .mos_opc_start = OBD_PING,
1517 .mos_opc_end = OBD_LAST_OPC,
1518 .mos_hs = mdt_obd_ops
1521 .mos_opc_start = LDLM_ENQUEUE,
1522 .mos_opc_end = LDLM_LAST_OPC,
1523 .mos_hs = mdt_dlm_ops
1526 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1527 .mos_opc_end = LLOG_LAST_OPC,
1528 .mos_hs = mdt_llog_ops
1535 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1536 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1537 MODULE_LICENSE("GPL");
1539 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1540 "number of mdt service threads to start");
1542 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);