1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/mdt/mdt_handler.c
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
13 * Author: Huang Hua <huanghua@clusterfs.com>
15 * This file is part of the Lustre file system, http://www.lustre.org
16 * Lustre is a trademark of Cluster File Systems, Inc.
18 * You may have signed or agreed to another license before downloading
19 * this software. If so, you are bound by the terms and conditions
20 * of that agreement, and the following does not apply to you. See the
21 * LICENSE file included with this distribution for more information.
23 * If you did not agree to a different license, then this copy of Lustre
24 * is open source software; you can redistribute it and/or modify it
25 * under the terms of version 2 of the GNU General Public License as
26 * published by the Free Software Foundation.
28 * In either case, Lustre is distributed in the hope that it will be
29 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * license text for more details.
35 # define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_MDS
39 #include <linux/module.h>
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
44 * struct OBD_{ALLOC,FREE}*()
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
55 #include <dt_object.h>
56 #include "mdt_internal.h"
59 * Initialized in mdt_mod_init().
61 unsigned long mdt_num_threads;
63 /* ptlrpc request handler for MDT. All handlers are
64 * grouped into several slices - struct mdt_opc_slice,
65 * and stored in an array - mdt_handlers[].
68 /* The name of this handler. */
70 /* Fail id for this handler, checked at the beginning of this handler.*/
72 /* Operation code for this handler */
74 /* flags are listed in enum mdt_handler_flags below. */
76 /* The actual handler function to execute. */
77 int (*mh_act)(struct mdt_thread_info *info);
78 /* Request format for this request. */
79 const struct req_format *mh_fmt;
82 enum mdt_handler_flags {
84 * struct mdt_body is passed in the incoming message, and object
85 * identified by this fid exists on disk.
87 HABEO_CORPUS = (1 << 0),
89 * struct ldlm_request is passed in the incoming message.
91 HABEO_CLAVIS = (1 << 1),
93 * this request has fixed reply format, so that reply message can be
94 * packed by generic code.
96 HABEO_REFERO = (1 << 2)
99 struct mdt_opc_slice {
102 struct mdt_handler *mos_hs;
105 static struct mdt_opc_slice mdt_handlers[];
107 static int mdt_handle (struct ptlrpc_request *req);
108 static struct mdt_device *mdt_dev (struct lu_device *d);
109 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
111 static struct lu_context_key mdt_thread_key;
112 static struct lu_object_operations mdt_obj_ops;
115 static int mdt_getstatus(struct mdt_thread_info *info)
117 struct md_device *next = info->mti_mdt->mdt_child;
119 struct mdt_body *body;
123 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
126 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
127 result = next->md_ops->mdo_root_get(info->mti_ctxt,
130 body->valid |= OBD_MD_FLID;
136 static int mdt_statfs(struct mdt_thread_info *info)
138 struct md_device *next = info->mti_mdt->mdt_child;
139 struct obd_statfs *osfs;
144 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
147 osfs = req_capsule_server_get(&info->mti_pill, &RMF_OBD_STATFS);
148 /* XXX max_age optimisation is needed here. See mds_statfs */
149 result = next->md_ops->mdo_statfs(info->mti_ctxt,
150 next, &info->mti_sfs);
151 statfs_pack(osfs, &info->mti_sfs);
157 void mdt_pack_attr2body(struct mdt_body *b,
158 struct lu_attr *attr,
159 const struct lu_fid *fid)
161 b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
162 OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
163 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
165 if (!S_ISREG(attr->la_mode))
166 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
169 b->atime = attr->la_atime;
170 b->mtime = attr->la_mtime;
171 b->ctime = attr->la_ctime;
172 b->mode = attr->la_mode;
173 b->size = attr->la_size;
174 b->blocks = attr->la_blocks;
175 b->uid = attr->la_uid;
176 b->gid = attr->la_gid;
177 b->flags = attr->la_flags;
178 b->nlink = attr->la_nlink;
182 b->valid |= OBD_MD_FLID;
186 static inline int mdt_body_has_lov(const struct lu_attr *la,
187 const struct mdt_body *body)
189 return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
190 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
193 static int mdt_getattr_internal(struct mdt_thread_info *info,
194 struct mdt_object *o)
196 struct md_object *next = mdt_object_child(o);
197 const struct mdt_body *reqbody = info->mti_body;
198 struct ptlrpc_request *req = mdt_info_req(info);
199 struct lu_attr *la = &info->mti_attr;
200 struct req_capsule *pill = &info->mti_pill;
201 const struct lu_context *ctxt = info->mti_ctxt;
202 struct mdt_body *repbody;
208 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
212 rc = mo_attr_get(ctxt, next, la);
214 CERROR("getattr error for "DFID3": %d\n",
215 PFID3(mdt_object_fid(o)), rc);
219 /* pre-getattr: to guess how many bytes we need. */
220 if (mdt_body_has_lov(la, reqbody)) {
221 /* this should return the total length, or error */
222 rc = mo_xattr_get(ctxt, next, NULL, 0, XATTR_NAME_LOV);
224 CDEBUG(D_INODE, "got %d(max=%d) bytes MD data for "DFID3"\n",
225 rc, info->mti_mdt->mdt_max_mdsize,
226 PFID3(mdt_object_fid(o)));
228 if (rc != -ENODATA && rc != -EOPNOTSUPP) {
232 } else if (rc > info->mti_mdt->mdt_max_mdsize) {
235 } else if (S_ISLNK(la->la_mode) && (reqbody->valid & OBD_MD_LINKNAME)) {
236 /* NB: It also uses the mdt_md to hold symname. */
237 rc = min_t(int, la->la_size + 1, reqbody->eadatasize);
239 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
241 #ifdef CONFIG_FS_POSIX_ACL
242 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
243 (reqbody->valid & OBD_MD_FLACL)) {
245 rc = mo_xattr_get(ctxt, next, NULL, 0, XATTR_NAME_ACL_ACCESS);
247 if (rc != -ENODATA && rc != -EOPNOTSUPP) {
248 CERROR("got acl size: %d\n", rc);
253 req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER, rc);
256 rc = req_capsule_pack(pill);
258 CERROR("lustre pack reply for getattr failed: rc %d\n", rc);
262 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
263 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
265 buffer = req_capsule_server_get(pill, &RMF_MDT_MD);
266 length = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
269 if (mdt_body_has_lov(la, reqbody)) {
271 rc = mo_xattr_get(info->mti_ctxt, next,
272 buffer, length, XATTR_NAME_LOV);
278 if (S_ISDIR(la->la_mode))
279 repbody->valid |= OBD_MD_FLDIREA;
281 repbody->valid |= OBD_MD_FLEASIZE;
282 repbody->eadatasize = rc;
283 } else if (S_ISLNK(la->la_mode) &&
284 (reqbody->valid & OBD_MD_LINKNAME) != 0) {
285 /* FIXME How to readlink??
286 rc = mo_xattr_get(info->mti_ctxt, next,
287 buffer, length, "readlink");
290 CERROR("readlink failed: %d\n", rc);
293 repbody->valid |= OBD_MD_LINKNAME;
294 repbody->eadatasize = rc + 1;
295 ((char*)buffer)[rc] = 0; /* NULL terminate */
296 CDEBUG(D_INODE, "symlink dest %s\n", (char*)buffer);
300 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
301 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
302 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
303 repbody->valid |= OBD_MD_FLMODEASIZE;
306 #ifdef CONFIG_FS_POSIX_ACL
307 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
308 (reqbody->valid & OBD_MD_FLACL)) {
309 buffer = req_capsule_server_get(&info->mti_pill,
311 length = req_capsule_get_size(&info->mti_pill,
315 rc = mo_xattr_get(info->mti_ctxt, next, buffer,
316 length, XATTR_NAME_ACL_ACCESS);
321 CERROR("got acl size: %d\n", rc);
324 repbody->aclsize = rc;
325 repbody->valid |= OBD_MD_FLACL;
331 static int mdt_getattr(struct mdt_thread_info *info)
335 LASSERT(info->mti_object != NULL);
336 LASSERT(lu_object_assert_exists(info->mti_ctxt,
337 &info->mti_object->mot_obj.mo_lu));
340 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
343 result = mdt_getattr_internal(info, info->mti_object);
349 * UPDATE lock should be taken against parent, and be release before exit;
350 * child_bits lock should be taken against child, and be returned back:
351 * (1)normal request should release the child lock;
352 * (2)intent request will grant the lock to client.
354 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
355 struct mdt_lock_handle *lhc,
358 struct mdt_object *parent = info->mti_object;
359 struct mdt_object *child;
360 struct md_object *next = mdt_object_child(info->mti_object);
361 struct lu_fid *child_fid = &info->mti_tmp_fid1;
364 struct mdt_lock_handle *lhp;
367 LASSERT(info->mti_object != NULL);
369 name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
373 /*step 1: lock parent */
374 lhp = &info->mti_lh[MDT_LH_PARENT];
375 lhp->mlh_mode = LCK_CR;
376 result = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
380 /*step 2: lookup child's fid by name */
381 result = mdo_lookup(info->mti_ctxt, next, name, child_fid);
382 if (result == -EREMOTE) {
383 /* This object is located on remote node */
384 req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
386 #ifdef CONFIG_FS_POSIX_ACL
387 req_capsule_set_size(&info->mti_pill, &RMF_EADATA,
390 result = req_capsule_pack(&info->mti_pill);
392 struct mdt_body *repbody;
393 repbody = req_capsule_server_get(&info->mti_pill,
395 repbody->fid1 = *child_fid;
397 GOTO(out_parent, result);
400 GOTO(out_parent, result);
402 /*step 3: find the child object by fid & lock it*/
403 lhc->mlh_mode = LCK_CR;
404 child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
406 GOTO(out_parent, result = PTR_ERR(child));
408 /* finally, we can get attr for child. */
409 result = mdt_getattr_internal(info, child);
411 mdt_object_unlock(info, child, lhc);
412 mdt_object_put(info->mti_ctxt, child);
416 mdt_object_unlock(info, parent, lhp);
420 /* normal handler: should release the child lock */
421 static int mdt_getattr_name(struct mdt_thread_info *info)
423 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
428 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE);
429 if (lustre_handle_is_used(&lhc->mlh_lh))
430 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
434 static struct lu_device_operations mdt_lu_ops;
436 static int lu_device_is_mdt(struct lu_device *d)
438 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
441 static struct mdt_device *mdt_dev(struct lu_device *d)
443 LASSERT(lu_device_is_mdt(d));
444 return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
447 static int mdt_connect(struct mdt_thread_info *info)
450 struct ptlrpc_request *req;
452 req = mdt_info_req(info);
453 result = target_handle_connect(req, mdt_handle);
455 LASSERT(req->rq_export != NULL);
456 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
461 static int mdt_disconnect(struct mdt_thread_info *info)
463 return target_handle_disconnect(mdt_info_req(info));
466 static int mdt_readpage(struct mdt_thread_info *info)
471 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
476 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_UNPACK, -EFAULT);
478 rc = mdt_reint_unpack(info, op);
480 rc = mdt_reint_rec(info);
486 static long mdt_reint_opcode(struct mdt_thread_info *info,
487 const struct req_format **fmt)
493 ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
496 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
497 if (opc < REINT_MAX && fmt[opc] != NULL)
498 req_capsule_extend(&info->mti_pill, fmt[opc]);
500 CERROR("Unsupported opc: %ld\n", opc);
505 static int mdt_reint(struct mdt_thread_info *info)
510 static const struct req_format *reint_fmts[REINT_MAX] = {
511 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
512 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
513 [REINT_LINK] = &RQF_MDS_REINT_LINK,
514 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
515 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
516 [REINT_OPEN] = &RQF_MDS_REINT_OPEN
521 opc = mdt_reint_opcode(info, reint_fmts);
523 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
525 rc = req_capsule_pack(&info->mti_pill);
527 rc = mdt_reint_internal(info, opc);
533 /* TODO these two methods not available now. */
535 /* this should sync the whole device */
536 static int mdt_device_sync(struct mdt_thread_info *info)
541 /* this should sync this object */
542 static int mdt_object_sync(struct mdt_thread_info *info)
547 static int mdt_sync(struct mdt_thread_info *info)
549 struct req_capsule *pill = &info->mti_pill;
550 struct mdt_body *body;
554 /* The fid may be zero, so we req_capsule_set manually */
555 req_capsule_set(pill, &RQF_MDS_SYNC);
557 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
561 if (fid_seq(&body->fid1) == 0) {
562 /* sync the whole device */
563 rc = req_capsule_pack(pill);
565 rc = mdt_device_sync(info);
568 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS | HABEO_REFERO);
570 rc = mdt_object_sync(info);
572 struct md_object *next;
573 const struct lu_fid *fid;
574 next = mdt_object_child(info->mti_object);
575 fid = mdt_object_fid(info->mti_object);
576 rc = mo_attr_get(info->mti_ctxt,
580 body = req_capsule_server_get(pill,
582 mdt_pack_attr2body(body,
592 static int mdt_handle_quotacheck(struct mdt_thread_info *info)
597 static int mdt_handle_quotactl(struct mdt_thread_info *info)
603 * OBD PING and other handlers.
606 static int mdt_obd_ping(struct mdt_thread_info *info)
610 result = target_handle_ping(mdt_info_req(info));
614 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
619 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
629 static struct ldlm_callback_suite cbs = {
630 .lcs_completion = ldlm_server_completion_ast,
631 .lcs_blocking = ldlm_server_blocking_ast,
635 static int mdt_enqueue(struct mdt_thread_info *info)
638 * info->mti_dlm_req already contains swapped and (if necessary)
639 * converted dlm request.
641 LASSERT(info->mti_dlm_req != NULL);
643 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
644 return ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
646 info->mti_dlm_req, &cbs);
649 static int mdt_convert(struct mdt_thread_info *info)
651 LASSERT(info->mti_dlm_req);
652 return ldlm_handle_convert0(mdt_info_req(info), info->mti_dlm_req);
655 static int mdt_bl_callback(struct mdt_thread_info *info)
657 CERROR("bl callbacks should not happen on MDS\n");
662 static int mdt_cp_callback(struct mdt_thread_info *info)
664 CERROR("cp callbacks should not happen on MDS\n");
670 * Build (DLM) resource name from fid.
672 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
673 struct ldlm_res_id *name)
675 memset(name, 0, sizeof *name);
676 name->name[0] = fid_seq(f);
677 name->name[1] = fid_oid(f);
678 name->name[2] = fid_ver(f);
683 * Return true if resource is for object identified by fid.
685 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
687 return name->name[0] == fid_seq(f) &&
688 name->name[1] == fid_oid(f) &&
689 name->name[2] == fid_ver(f);
692 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
693 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
694 struct lustre_handle *lh, ldlm_mode_t mode,
695 ldlm_policy_data_t *policy,
696 struct ldlm_res_id *res_id)
706 /* FIXME: is that correct to have @flags=0 here? */
707 rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, res_id),
708 LDLM_IBITS, policy, mode, &flags,
709 ldlm_blocking_ast, ldlm_completion_ast, NULL,
710 NULL, NULL, 0, NULL, lh);
711 RETURN(rc == ELDLM_OK ? 0 : -EIO);
714 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
715 struct lustre_handle *lh, ldlm_mode_t mode)
717 struct ldlm_lock *lock;
720 /* FIXME: this is debug stuff, remove it later. */
721 lock = ldlm_handle2lock(lh);
723 CERROR("invalid lock handle "LPX64, lh->cookie);
727 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
729 ldlm_lock_decref(lh, mode);
733 static struct mdt_object *mdt_obj(struct lu_object *o)
735 LASSERT(lu_device_is_mdt(o->lo_dev));
736 return container_of0(o, struct mdt_object, mot_obj.mo_lu);
739 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
740 struct mdt_device *d,
741 const struct lu_fid *f)
745 o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
747 return (struct mdt_object *)o;
752 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
753 struct mdt_lock_handle *lh, __u64 ibits)
755 ldlm_policy_data_t *policy = &info->mti_policy;
756 struct ldlm_res_id *res_id = &info->mti_res_id;
757 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
759 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
760 LASSERT(lh->mlh_mode != LCK_MINMODE);
762 policy->l_inodebits.bits = ibits;
764 return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, policy, res_id);
767 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
768 struct mdt_lock_handle *lh)
770 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
772 if (lustre_handle_is_used(&lh->mlh_lh)) {
773 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
774 lh->mlh_lh.cookie = 0;
778 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
779 const struct lu_fid *f,
780 struct mdt_lock_handle *lh,
783 struct mdt_object *o;
785 o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
789 result = mdt_object_lock(info, o, lh, ibits);
791 mdt_object_put(info->mti_ctxt, o);
798 void mdt_object_unlock_put(struct mdt_thread_info * info,
799 struct mdt_object * o,
800 struct mdt_lock_handle *lh)
802 mdt_object_unlock(info, o, lh);
803 mdt_object_put(info->mti_ctxt, o);
806 static struct mdt_handler *mdt_handler_find(__u32 opc)
808 struct mdt_opc_slice *s;
809 struct mdt_handler *h;
812 for (s = mdt_handlers; s->mos_hs != NULL; s++) {
813 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
814 h = s->mos_hs + (opc - s->mos_opc_start);
816 LASSERT(h->mh_opc == opc);
818 h = NULL; /* unsupported opc */
825 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
827 return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid;
830 static int mdt_lock_resname_compat(struct mdt_device *m,
831 struct ldlm_request *req)
833 /* XXX something... later. */
837 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
839 /* XXX something... later. */
844 * Generic code handling requests that have struct mdt_body passed in:
846 * - extract mdt_body from request and save it in @info, if present;
848 * - create lu_object, corresponding to the fid in mdt_body, and save it in
851 * - if HABEO_CORPUS flag is set for this request type check whether object
852 * actually exists on storage (lu_object_exists()).
855 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
858 const struct mdt_body *body;
859 struct mdt_object *obj;
860 const struct lu_context *ctx;
861 struct req_capsule *pill;
863 ctx = info->mti_ctxt;
864 pill = &info->mti_pill;
866 body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
868 if (fid_is_sane(&body->fid1)) {
869 obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
871 if ((flags & HABEO_CORPUS) &&
872 !lu_object_exists(ctx,
873 &obj->mot_obj.mo_lu)) {
874 mdt_object_put(ctx, obj);
877 info->mti_object = obj;
881 result = PTR_ERR(obj);
883 CERROR("Invalid fid: "DFID3"\n", PFID3(&body->fid1));
891 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
893 struct req_capsule *pill;
897 pill = &info->mti_pill;
899 if (req_capsule_has_field(pill, &RMF_MDT_BODY))
900 result = mdt_body_unpack(info, flags);
904 if (result == 0 && (flags & HABEO_REFERO))
905 result = req_capsule_pack(pill);
910 /* FIXME: fake untill journal callback is OK.*/
911 int mdt_update_last_transno(struct mdt_thread_info *info)
914 __u64 last_committed;
915 struct mdt_device *mdt = info->mti_mdt;
916 struct ptlrpc_request * req = mdt_info_req(info);
918 LASSERT(mdt != NULL);
919 spin_lock(&mdt->mdt_transno_lock);
920 last_transno = ++ (mdt->mdt_last_transno);
921 last_committed = ++ (mdt->mdt_last_committed);
922 spin_unlock(&mdt->mdt_transno_lock);
924 CERROR("last_transno = %llu, last_committed = %llu\n",
925 last_transno, last_committed);
926 req->rq_repmsg->transno = req->rq_transno = last_transno;
927 req->rq_repmsg->last_xid = req->rq_xid;
928 req->rq_repmsg->last_committed = last_committed;
929 req->rq_export->exp_obd->obd_last_committed = last_committed;
931 req->rq_repmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
932 target_committed_to_req(req);
938 * Invoke handler for this request opc. Also do necessary preprocessing
939 * (according to handler ->mh_flags), and post-processing (setting of
940 * ->last_{xid,committed}).
942 static int mdt_req_handle(struct mdt_thread_info *info,
943 struct mdt_handler *h, struct ptlrpc_request *req)
950 LASSERT(h->mh_act != NULL);
951 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
952 LASSERT(current->journal_info == NULL);
954 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
956 if (h->mh_fail_id != 0)
957 OBD_FAIL_RETURN(h->mh_fail_id, 0);
961 LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL));
963 req_capsule_init(&info->mti_pill,
964 req, RCL_SERVER, info->mti_rep_buf_size);
966 if (h->mh_fmt != NULL) {
967 req_capsule_set(&info->mti_pill, h->mh_fmt);
968 result = mdt_unpack_req_pack_rep(info, flags);
971 if (result == 0 && flags & HABEO_CLAVIS) {
972 struct ldlm_request *dlm_req;
974 LASSERT(h->mh_fmt != NULL);
976 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
977 if (dlm_req != NULL) {
978 if (info->mti_mdt->mdt_opts.mo_compat_resname)
979 result = mdt_lock_resname_compat(info->mti_mdt,
981 info->mti_dlm_req = dlm_req;
983 CERROR("Can't unpack dlm request\n");
992 result = h->mh_act(info);
994 * XXX result value is unconditionally shoved into ->rq_status
995 * (original code sometimes placed error code into ->rq_status, and
996 * sometimes returned it to the
997 * caller). ptlrpc_server_handle_request() doesn't check return value
1000 req->rq_status = result;
1002 LASSERT(current->journal_info == NULL);
1004 if (result == 0 && flags & HABEO_CLAVIS &&
1005 info->mti_mdt->mdt_opts.mo_compat_resname) {
1006 struct ldlm_reply *dlm_rep;
1008 dlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1009 if (dlm_rep != NULL)
1010 result = mdt_lock_reply_compat(info->mti_mdt, dlm_rep);
1013 /* If we're DISCONNECTing, the mdt_export_data is already freed */
1014 if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
1015 /* FIXME: fake untill journal callback is OK.*/
1016 mdt_update_last_transno(info);
1018 req_capsule_fini(&info->mti_pill);
1023 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1025 lh->mlh_lh.cookie = 0ull;
1026 lh->mlh_mode = LCK_MINMODE;
1029 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1031 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1034 static void mdt_thread_info_init(struct mdt_thread_info *info)
1038 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1039 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1040 info->mti_rep_buf_size[i] = 0;
1041 info->mti_rep_buf_nr = i;
1042 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1043 mdt_lock_handle_init(&info->mti_lh[i]);
1046 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1050 if (info->mti_object != NULL) {
1051 mdt_object_put(info->mti_ctxt, info->mti_object);
1052 info->mti_object = NULL;
1054 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1055 mdt_lock_handle_fini(&info->mti_lh[i]);
1058 static int mds_msg_check_version(struct lustre_msg *msg)
1062 /* TODO: enable the below check while really introducing msg version.
1063 * it's disabled because it will break compatibility with b1_4.
1069 case MDS_DISCONNECT:
1071 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1073 CERROR("bad opc %u version %08x, expecting %08x\n",
1074 msg->opc, msg->version, LUSTRE_OBD_VERSION);
1078 case MDS_GETATTR_NAME:
1083 case MDS_DONE_WRITING:
1089 case MDS_QUOTACHECK:
1093 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1095 CERROR("bad opc %u version %08x, expecting %08x\n",
1096 msg->opc, msg->version, LUSTRE_MDS_VERSION);
1100 case LDLM_BL_CALLBACK:
1101 case LDLM_CP_CALLBACK:
1102 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1104 CERROR("bad opc %u version %08x, expecting %08x\n",
1105 msg->opc, msg->version, LUSTRE_DLM_VERSION);
1107 case OBD_LOG_CANCEL:
1108 case LLOG_ORIGIN_HANDLE_CREATE:
1109 case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1110 case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1111 case LLOG_ORIGIN_HANDLE_READ_HEADER:
1112 case LLOG_ORIGIN_HANDLE_CLOSE:
1114 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1116 CERROR("bad opc %u version %08x, expecting %08x\n",
1117 msg->opc, msg->version, LUSTRE_LOG_VERSION);
1120 CERROR("MDS unknown opcode %d\n", msg->opc);
1126 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
1127 struct obd_device *obd, int *process)
1129 switch (req->rq_reqmsg->opc) {
1130 case MDS_CONNECT: /* This will never get here, but for completeness. */
1131 case OST_CONNECT: /* This will never get here, but for completeness. */
1132 case MDS_DISCONNECT:
1133 case OST_DISCONNECT:
1138 case MDS_SYNC: /* used in unmounting */
1142 *process = target_queue_recovery_request(req, obd);
1146 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1148 /* XXX what should we set rq_status to here? */
1149 req->rq_status = -EAGAIN;
1150 RETURN(ptlrpc_error(req));
1155 * Handle recovery. Return:
1156 * +1: continue request processing;
1157 * -ve: abort immediately with the given error code;
1158 * 0: send reply with error code in req->rq_status;
1160 static int mdt_recovery(struct ptlrpc_request *req)
1164 struct obd_device *obd;
1168 if (req->rq_reqmsg->opc == MDS_CONNECT)
1171 if (req->rq_export == NULL) {
1172 CERROR("operation %d on unconnected MDS from %s\n",
1173 req->rq_reqmsg->opc,
1174 libcfs_id2str(req->rq_peer));
1175 req->rq_status = -ENOTCONN;
1179 /* sanity check: if the xid matches, the request must be marked as a
1180 * resent or replayed */
1181 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
1182 lustre_msg_get_flags(req->rq_reqmsg) &
1183 (MSG_RESENT | MSG_REPLAY)),
1184 "rq_xid "LPU64" matches last_xid, "
1185 "expected RESENT flag\n", req->rq_xid);
1187 /* else: note the opposite is not always true; a RESENT req after a
1188 * failover will usually not match the last_xid, since it was likely
1189 * never committed. A REPLAYed request will almost never match the
1190 * last xid, however it could for a committed, but still retained,
1193 obd = req->rq_export->exp_obd;
1195 /* Check for aborted recovery... */
1196 spin_lock_bh(&obd->obd_processing_task_lock);
1197 abort_recovery = obd->obd_abort_recovery;
1198 recovering = obd->obd_recovering;
1199 spin_unlock_bh(&obd->obd_processing_task_lock);
1200 if (abort_recovery) {
1201 target_abort_recovery(obd);
1202 } else if (recovering) {
1206 rc = mdt_filter_recovery_request(req, obd, &should_process);
1207 if (rc != 0 || !should_process) {
1215 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
1217 struct obd_device *obd;
1219 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1220 if (req->rq_reqmsg->opc != OBD_PING)
1221 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1223 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1224 if (obd && obd->obd_recovering) {
1225 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1226 RETURN(target_queue_final_reply(req, req->rq_status));
1228 /* Lost a race with recovery; let the error path
1230 req->rq_status = -ENOTCONN;
1233 target_send_reply(req, req->rq_status, info->mti_fail_id);
1234 RETURN(req->rq_status);
1237 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
1239 struct mdt_handler *h;
1240 struct lustre_msg *msg;
1245 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1247 LASSERT(current->journal_info == NULL);
1249 msg = req->rq_reqmsg;
1250 result = mds_msg_check_version(msg);
1252 result = mdt_recovery(req);
1255 h = mdt_handler_find(msg->opc);
1257 result = mdt_req_handle(info, h, req);
1259 req->rq_status = -ENOTSUPP;
1260 result = ptlrpc_error(req);
1265 result = mdt_reply(req, info);
1268 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
1273 * MDT handler function called by ptlrpc service thread when request comes.
1275 * XXX common "target" functionality should be factored into separate module
1276 * shared by mdt, ost and stand-alone services like fld.
1278 static int mdt_handle(struct ptlrpc_request *req)
1281 struct lu_context *ctx;
1282 struct mdt_thread_info *info;
1285 ctx = req->rq_svc_thread->t_ctx;
1286 LASSERT(ctx != NULL);
1287 LASSERT(ctx->lc_thread == req->rq_svc_thread);
1289 info = lu_context_key_get(ctx, &mdt_thread_key);
1290 LASSERT(info != NULL);
1292 mdt_thread_info_init(info);
1293 /* it can be NULL while CONNECT */
1295 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1297 result = mdt_handle0(req, info);
1298 mdt_thread_info_fini(info);
1302 /*Please move these functions from mds to mdt*/
1303 int intent_disposition(struct ldlm_reply *rep, int flag)
1307 return (rep->lock_policy_res1 & flag);
1310 void intent_set_disposition(struct ldlm_reply *rep, int flag)
1314 rep->lock_policy_res1 |= flag;
1330 static int mdt_intent_getattr(enum mdt_it_code opcode,
1331 struct mdt_thread_info *info,
1332 struct ldlm_lock **,
1334 static int mdt_intent_reint(enum mdt_it_code opcode,
1335 struct mdt_thread_info *info,
1336 struct ldlm_lock **,
1339 static struct mdt_it_flavor {
1340 const struct req_format *it_fmt;
1342 int (*it_act)(enum mdt_it_code ,
1343 struct mdt_thread_info *,
1344 struct ldlm_lock **,
1347 } mdt_it_flavor[] = {
1349 .it_fmt = &RQF_LDLM_INTENT,
1350 /*.it_flags = HABEO_REFERO,*/
1352 .it_act = mdt_intent_reint,
1353 .it_reint = REINT_OPEN
1356 .it_fmt = &RQF_LDLM_INTENT,
1358 .it_act = mdt_intent_reint,
1359 .it_reint = REINT_OPEN
1362 .it_fmt = &RQF_LDLM_INTENT,
1364 .it_act = mdt_intent_reint,
1365 .it_reint = REINT_CREATE
1367 [MDT_IT_GETATTR] = {
1368 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1370 .it_act = mdt_intent_getattr
1372 [MDT_IT_READDIR] = {
1378 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1380 .it_act = mdt_intent_getattr
1383 .it_fmt = &RQF_LDLM_INTENT_UNLINK,
1385 .it_act = NULL, /* XXX can be mdt_intent_reint, ? */
1386 .it_reint = REINT_UNLINK
1393 [MDT_IT_GETXATTR] = {
1400 static int mdt_intent_getattr(enum mdt_it_code opcode,
1401 struct mdt_thread_info *info,
1402 struct ldlm_lock **lockp,
1406 struct ldlm_lock *old_lock = *lockp;
1407 struct ldlm_lock *new_lock = NULL;
1408 struct ptlrpc_request *req = mdt_info_req(info);
1409 struct ldlm_reply *ldlm_rep;
1410 struct mdt_lock_handle tmp_lock;
1411 struct mdt_lock_handle *lhc = &tmp_lock;
1418 child_bits = MDS_INODELOCK_LOOKUP;
1420 case MDT_IT_GETATTR:
1421 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1424 CERROR("Unhandled till now");
1429 mdt_lock_handle_init(lhc);
1430 rc = mdt_getattr_name_lock(info, lhc, child_bits);
1431 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1432 intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
1433 intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
1437 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
1439 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
1441 ldlm_rep->lock_policy_res2 = 0;
1442 RETURN(ELDLM_LOCK_ABORTED);
1445 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
1447 new_lock = ldlm_handle2lock(&lhc->mlh_lh);
1448 if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1451 LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
1452 opcode, lhc->mlh_lh.cookie);
1456 /* FIXME:This only happens when MDT can handle RESENT */
1457 if (new_lock->l_export == req->rq_export) {
1458 /* Already gave this to the client, which means that we
1459 * reconstructed a reply. */
1460 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1462 RETURN(ELDLM_LOCK_REPLACED);
1465 /* Fixup the lock to be given to the client */
1466 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1467 new_lock->l_readers = 0;
1468 new_lock->l_writers = 0;
1470 new_lock->l_export = class_export_get(req->rq_export);
1471 list_add(&new_lock->l_export_chain,
1472 &new_lock->l_export->exp_ldlm_data.led_held_locks);
1474 new_lock->l_blocking_ast = old_lock->l_blocking_ast;
1475 new_lock->l_completion_ast = old_lock->l_completion_ast;
1477 new_lock->l_remote_handle = old_lock->l_remote_handle;
1479 new_lock->l_flags &= ~LDLM_FL_LOCAL;
1481 LDLM_LOCK_PUT(new_lock);
1482 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1484 RETURN(ELDLM_LOCK_REPLACED);
1487 static int mdt_intent_reint(enum mdt_it_code opcode,
1488 struct mdt_thread_info *info,
1489 struct ldlm_lock **lockp,
1494 struct ldlm_reply *rep;
1496 static const struct req_format *intent_fmts[REINT_MAX] = {
1497 [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
1498 [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN
1503 opc = mdt_reint_opcode(info, intent_fmts);
1507 if (mdt_it_flavor[opcode].it_reint != opc) {
1508 CERROR("Reint code %ld doesn't match intent: %d\n",
1513 rc = req_capsule_pack(&info->mti_pill);
1517 rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1520 rep->lock_policy_res2 = mdt_reint_internal(info, opc);
1521 intent_set_disposition(rep, DISP_IT_EXECD);
1523 RETURN(ELDLM_LOCK_ABORTED);
1526 static int mdt_intent_code(long itcode)
1532 result = MDT_IT_OPEN;
1534 case IT_OPEN|IT_CREAT:
1535 result = MDT_IT_OCREAT;
1538 result = MDT_IT_CREATE;
1541 result = MDT_IT_READDIR;
1544 result = MDT_IT_GETATTR;
1547 result = MDT_IT_LOOKUP;
1550 result = MDT_IT_UNLINK;
1553 result = MDT_IT_TRUNC;
1556 result = MDT_IT_GETXATTR;
1559 CERROR("Unknown intent opcode: %ld\n", itcode);
1566 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
1567 struct ldlm_lock **lockp, int flags)
1569 struct req_capsule *pill;
1570 struct mdt_it_flavor *flv;
1575 opc = mdt_intent_code(itopc);
1579 pill = &info->mti_pill;
1580 flv = &mdt_it_flavor[opc];
1582 if (flv->it_fmt != NULL)
1583 req_capsule_extend(pill, flv->it_fmt);
1585 rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
1587 /* execute policy */
1588 /*XXX LASSERT( flv->it_act) */
1590 rc = flv->it_act(opc, info, lockp, flags);
1597 static int mdt_intent_policy(struct ldlm_namespace *ns,
1598 struct ldlm_lock **lockp, void *req_cookie,
1599 ldlm_mode_t mode, int flags, void *data)
1601 struct mdt_thread_info *info;
1602 struct ptlrpc_request *req = req_cookie;
1603 struct ldlm_intent *it;
1604 struct req_capsule *pill;
1605 struct ldlm_lock *lock = *lockp;
1610 LASSERT(req != NULL);
1612 info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1613 LASSERT(info != NULL);
1614 pill = &info->mti_pill;
1615 LASSERT(pill->rc_req == req);
1617 if (req->rq_reqmsg->bufcount > MDS_REQ_INTENT_IT_OFF) {
1618 req_capsule_extend(pill, &RQF_LDLM_INTENT);
1619 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
1621 LDLM_DEBUG(lock, "intent policy opc: %s",
1622 ldlm_it2str(it->opc));
1624 rc = mdt_intent_opc(it->opc, info, lockp, flags);
1625 mdt_update_last_transno(info);
1631 /* No intent was provided */
1632 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
1633 rc = req_capsule_pack(pill);
1641 static int mdt_seq_fini(const struct lu_context *ctx,
1642 struct mdt_device *m)
1644 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1647 if (ls && ls->ls_server_seq) {
1648 seq_server_fini(ls->ls_server_seq, ctx);
1649 OBD_FREE_PTR(ls->ls_server_seq);
1650 ls->ls_server_seq = NULL;
1652 if (ls && ls->ls_ctlr_seq) {
1653 seq_server_fini(ls->ls_ctlr_seq, ctx);
1654 OBD_FREE_PTR(ls->ls_ctlr_seq);
1655 ls->ls_ctlr_seq = NULL;
1660 static int mdt_seq_init(const struct lu_context *ctx,
1662 struct mdt_device *m)
1668 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1670 /* sequence-controller node */
1671 if (ls->ls_node_id == 0) {
1672 LASSERT(ls->ls_ctlr_seq == NULL);
1673 OBD_ALLOC_PTR(ls->ls_ctlr_seq);
1675 if (ls->ls_ctlr_seq != NULL) {
1676 rc = seq_server_init(ls->ls_ctlr_seq,
1677 m->mdt_bottom, uuid,
1684 LASSERT(ls->ls_server_seq == NULL);
1685 OBD_ALLOC_PTR(ls->ls_server_seq);
1687 if (ls->ls_server_seq != NULL) {
1688 rc = seq_server_init(ls->ls_server_seq,
1689 m->mdt_bottom, uuid,
1696 mdt_seq_fini(ctx, m);
1701 /* XXX: this is ugly, should be something else */
1702 static int mdt_seq_init_ctlr(const struct lu_context *ctx,
1703 struct mdt_device *m,
1704 struct lustre_cfg *cfg)
1706 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1707 struct obd_device *mdc;
1708 struct obd_uuid *uuidp;
1712 struct mdt_thread_info *info;
1713 char *p, *index_string = lustre_cfg_string(cfg, 2);
1716 info = lu_context_key_get(ctx, &mdt_thread_key);
1717 uuidp = &info->mti_u.uuid;
1719 LASSERT(index_string);
1721 index = simple_strtol(index_string, &p, 10);
1723 CERROR("Invalid index in lustre_cgf, offset 2\n");
1727 /* check if this is first MDC add and controller is not yet
1729 if (index != 0 || ls->ls_ctlr_exp)
1732 uuid_str = lustre_cfg_string(cfg, 1);
1733 obd_str2uuid(uuidp, uuid_str);
1734 mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, NULL);
1736 CERROR("can't find controller MDC by uuid %s\n",
1739 } else if (!mdc->obd_set_up) {
1740 CERROR("target %s not set up\n", mdc->obd_name);
1743 struct lustre_handle conn = {0, };
1745 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
1746 mdc->obd_name, mdc->obd_uuid.uuid);
1748 rc = obd_connect(&conn, mdc, &mdc->obd_uuid, NULL);
1751 CERROR("target %s connect error %d\n",
1754 ls->ls_ctlr_exp = class_conn2export(&conn);
1756 OBD_ALLOC_PTR(ls->ls_client_seq);
1758 if (ls->ls_client_seq != NULL) {
1759 rc = seq_client_init(ls->ls_client_seq,
1768 LASSERT(ls->ls_server_seq != NULL);
1770 rc = seq_server_init_ctlr(ls->ls_server_seq,
1779 static void mdt_seq_fini_ctlr(struct mdt_device *m)
1785 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1787 if (ls && ls->ls_server_seq)
1788 seq_server_fini_ctlr(ls->ls_server_seq);
1790 if (ls && ls->ls_client_seq) {
1791 seq_client_fini(ls->ls_client_seq);
1792 OBD_FREE_PTR(ls->ls_client_seq);
1793 ls->ls_client_seq = NULL;
1796 if (ls && ls->ls_ctlr_exp) {
1797 int rc = obd_disconnect(ls->ls_ctlr_exp);
1798 ls->ls_ctlr_exp = NULL;
1801 CERROR("failure to disconnect "
1811 static int mdt_fld_init(const struct lu_context *ctx,
1813 struct mdt_device *m)
1819 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1821 OBD_ALLOC_PTR(ls->ls_server_fld);
1823 if (ls->ls_server_fld != NULL) {
1824 rc = fld_server_init(ls->ls_server_fld, ctx,
1825 m->mdt_bottom, uuid);
1827 OBD_FREE_PTR(ls->ls_server_fld);
1828 ls->ls_server_fld = NULL;
1836 static int mdt_fld_fini(const struct lu_context *ctx,
1837 struct mdt_device *m)
1839 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1842 if (ls && ls->ls_server_fld) {
1843 fld_server_fini(ls->ls_server_fld, ctx);
1844 OBD_FREE_PTR(ls->ls_server_fld);
1845 ls->ls_server_fld = NULL;
1850 /* device init/fini methods */
1851 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1853 if (m->mdt_service != NULL) {
1854 ptlrpc_unregister_service(m->mdt_service);
1855 m->mdt_service = NULL;
1859 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1862 struct ptlrpc_service_conf conf = {
1863 .psc_nbufs = MDS_NBUFS,
1864 .psc_bufsize = MDS_BUFSIZE,
1865 .psc_max_req_size = MDS_MAXREQSIZE,
1866 .psc_max_reply_size = MDS_MAXREPSIZE,
1867 .psc_req_portal = MDS_REQUEST_PORTAL,
1868 .psc_rep_portal = MDC_REPLY_PORTAL,
1869 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
1871 * We'd like to have a mechanism to set this on a per-device
1872 * basis, but alas...
1874 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
1881 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1882 "mdt_ldlm_client", &m->mdt_ldlm_client);
1885 ptlrpc_init_svc_conf(&conf, mdt_handle, LUSTRE_MDT0_NAME,
1886 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1888 if (m->mdt_service == NULL)
1891 rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1893 GOTO(err_mdt_svc, rc);
1897 ptlrpc_unregister_service(m->mdt_service);
1898 m->mdt_service = NULL;
1903 static void mdt_stack_fini(const struct lu_context *ctx,
1904 struct mdt_device *m, struct lu_device *d)
1906 /* goes through all stack */
1908 struct lu_device *n;
1909 struct obd_type *type;
1910 struct lu_device_type *ldt = d->ld_type;
1914 /* each fini() returns next device in stack of layers
1915 * * so we can avoid the recursion */
1916 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
1917 ldt->ldt_ops->ldto_device_free(ctx, d);
1919 type = ldt->ldt_obd_type;
1921 class_put_type(type);
1922 /* switch to the next device in the layer */
1925 m->mdt_child = NULL;
1928 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
1929 const char *typename,
1930 struct lu_device *child,
1931 struct lustre_cfg *cfg)
1933 struct obd_type *type;
1934 struct lu_device_type *ldt;
1935 struct lu_device *d;
1939 type = class_get_type(typename);
1941 CERROR("Unknown type: '%s'\n", typename);
1942 GOTO(out, rc = -ENODEV);
1947 CERROR("type: '%s'\n", typename);
1948 GOTO(out_type, rc = -EINVAL);
1951 ldt->ldt_obd_type = type;
1952 d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
1954 CERROR("Cannot allocate device: '%s'\n", typename);
1955 GOTO(out_type, rc = -ENODEV);
1958 LASSERT(child->ld_site);
1959 d->ld_site = child->ld_site;
1962 rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
1964 CERROR("can't init device '%s', rc %d\n", typename, rc);
1965 GOTO(out_alloc, rc);
1971 ldt->ldt_ops->ldto_device_free(ctx, d);
1974 class_put_type(type);
1979 static int mdt_stack_init(const struct lu_context *ctx,
1980 struct mdt_device *m, struct lustre_cfg *cfg)
1982 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1983 struct lu_device *tmp;
1987 /* init the stack */
1988 tmp = mdt_layer_setup(ctx, LUSTRE_OSD0_NAME, d, cfg);
1990 RETURN(PTR_ERR(tmp));
1992 m->mdt_bottom = lu2dt_dev(tmp);
1994 tmp = mdt_layer_setup(ctx, LUSTRE_MDD0_NAME, d, cfg);
1996 GOTO(out, rc = PTR_ERR(tmp));
1999 tmp = mdt_layer_setup(ctx, LUSTRE_CMM0_NAME, d, cfg);
2001 GOTO(out, rc = PTR_ERR(tmp));
2004 m->mdt_child = lu2md_dev(d);
2006 /* process setup config */
2007 tmp = &m->mdt_md_dev.md_lu_dev;
2008 rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2012 /* fini from last known good lu_device */
2014 mdt_stack_fini(ctx, m, d);
2019 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2021 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2022 struct lu_site *ls = d->ld_site;
2027 mdt_fs_cleanup(ctx, m);
2029 ping_evictor_stop();
2030 mdt_stop_ptlrpc_service(m);
2032 /* finish the stack */
2033 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2035 mdt_fld_fini(ctx, m);
2036 mdt_seq_fini(ctx, m);
2037 mdt_seq_fini_ctlr(m);
2039 LASSERT(atomic_read(&d->ld_ref) == 0);
2040 md_device_fini(&m->mdt_md_dev);
2042 if (m->mdt_namespace != NULL) {
2043 ldlm_namespace_free(m->mdt_namespace, 0);
2044 m->mdt_namespace = NULL;
2055 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2056 struct lu_device_type *t, struct lustre_cfg *cfg)
2061 const char *dev = lustre_cfg_string(cfg, 0);
2062 const char *num = lustre_cfg_string(cfg, 2);
2063 struct obd_device *obd;
2066 obd = class_name2obd(dev);
2067 m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2069 spin_lock_init(&m->mdt_transno_lock);
2070 /* FIXME: We need to load them from disk. But now fake it */
2071 m->mdt_last_transno = 100;
2072 m->mdt_last_committed = 99;
2073 m->mdt_max_mdsize = MAX_MD_SIZE;
2074 m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2076 /* Temporary. should parse mount option. */
2077 m->mdt_opts.mo_user_xattr = 0;
2078 m->mdt_opts.mo_acl = 0;
2079 m->mdt_opts.mo_compat_resname = 0;
2086 md_device_init(&m->mdt_md_dev, t);
2087 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2089 rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2091 CERROR("can't init lu_site, rc %d\n", rc);
2092 GOTO(err_free_site, rc);
2095 /* init the stack */
2096 rc = mdt_stack_init(ctx, m, cfg);
2098 CERROR("can't init device stack, rc %d\n", rc);
2099 GOTO(err_fini_site, rc);
2102 /* set server index */
2104 s->ls_node_id = simple_strtol(num, NULL, 10);
2106 rc = mdt_fld_init(ctx, obd->obd_name, m);
2108 GOTO(err_fini_stack, rc);
2110 rc = mdt_seq_init(ctx, obd->obd_name, m);
2112 GOTO(err_fini_fld, rc);
2114 snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
2115 m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
2116 if (m->mdt_namespace == NULL)
2117 GOTO(err_fini_seq, rc = -ENOMEM);
2119 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
2121 rc = mdt_start_ptlrpc_service(m);
2123 GOTO(err_free_ns, rc);
2125 ping_evictor_start();
2126 /* TODO: wait for read & write
2127 rc = mdt_fs_setup(ctx, m);
2129 GOTO(err_stop_service, rc);
2134 mdt_stop_ptlrpc_service(m);
2137 ldlm_namespace_free(m->mdt_namespace, 0);
2138 m->mdt_namespace = NULL;
2140 mdt_seq_fini(ctx, m);
2142 mdt_fld_fini(ctx, m);
2144 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2149 md_device_fini(&m->mdt_md_dev);
2153 /* used by MGS to process specific configurations */
2154 static int mdt_process_config(const struct lu_context *ctx,
2155 struct lu_device *d, struct lustre_cfg *cfg)
2157 struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
2161 switch (cfg->lcfg_command) {
2163 /* add mdc hook to get first MDT uuid and connect it to
2164 * ls->controller to use for seq manager. */
2165 err = mdt_seq_init_ctlr(ctx, mdt_dev(d), cfg);
2167 CERROR("can't initialize controller export, "
2170 /* all MDT specific commands should be here */
2172 /* others are passed further */
2173 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
2178 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
2179 const struct lu_object_header *hdr,
2180 struct lu_device *d)
2182 struct mdt_object *mo;
2188 struct lu_object *o;
2189 struct lu_object_header *h;
2191 o = &mo->mot_obj.mo_lu;
2192 h = &mo->mot_header;
2193 lu_object_header_init(h);
2194 lu_object_init(o, h, d);
2195 lu_object_add_top(h, o);
2196 o->lo_ops = &mdt_obj_ops;
2202 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
2204 struct mdt_device *d = mdt_dev(o->lo_dev);
2205 struct lu_device *under;
2206 struct lu_object *below;
2208 under = &d->mdt_child->md_lu_dev;
2209 below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
2210 if (below != NULL) {
2211 lu_object_add(o, below);
2217 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
2219 struct mdt_object *mo = mdt_obj(o);
2220 struct lu_object_header *h;
2225 lu_object_header_fini(h);
2230 static int mdt_object_print(const struct lu_context *ctxt,
2231 struct seq_file *f, const struct lu_object *o)
2233 return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
2236 int mdt_object_exists(const struct lu_context *ctx,
2237 const struct lu_object *o)
2239 return lu_object_exists(ctx, lu_object_next(o));
2242 static struct lu_device_operations mdt_lu_ops = {
2243 .ldo_object_alloc = mdt_object_alloc,
2244 .ldo_process_config = mdt_process_config
2247 static struct lu_object_operations mdt_obj_ops = {
2248 .loo_object_init = mdt_object_init,
2249 .loo_object_free = mdt_object_free,
2250 .loo_object_print = mdt_object_print,
2251 .loo_object_exists = mdt_object_exists
2254 /* mds_connect_internal */
2255 static int mdt_connect0(struct mdt_device *mdt,
2256 struct obd_export *exp, struct obd_connect_data *data)
2259 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
2260 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
2262 /* If no known bits (which should not happen, probably,
2263 as everybody should support LOOKUP and UPDATE bits at least)
2264 revert to compat mode with plain locks. */
2265 if (!data->ocd_ibits_known &&
2266 data->ocd_connect_flags & OBD_CONNECT_IBITS)
2267 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
2269 if (!mdt->mdt_opts.mo_acl)
2270 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
2272 if (!mdt->mdt_opts.mo_user_xattr)
2273 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
2275 exp->exp_connect_flags = data->ocd_connect_flags;
2276 data->ocd_version = LUSTRE_VERSION_CODE;
2277 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
2280 if (mdt->mdt_opts.mo_acl &&
2281 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
2282 CWARN("%s: MDS requires ACL support but client does not\n",
2283 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
2289 /* mds_connect copy */
2290 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
2291 struct obd_uuid *cluuid,
2292 struct obd_connect_data *data)
2294 struct obd_export *exp;
2296 struct mdt_device *mdt;
2297 struct mdt_export_data *med;
2298 struct mdt_client_data *mcd;
2299 struct lu_context ctxt;
2302 rc = lu_context_init(&ctxt);
2305 lu_context_enter(&ctxt);
2307 if (!conn || !obd || !cluuid)
2310 mdt = mdt_dev(obd->obd_lu_dev);
2312 rc = class_connect(conn, obd, cluuid);
2316 exp = class_conn2export(conn);
2317 LASSERT(exp != NULL);
2318 med = &exp->exp_mdt_data;
2320 rc = mdt_connect0(mdt, exp, data);
2324 memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
2326 rc = mdt_client_add(&ctxt, mdt, med, -1);
2331 class_disconnect(exp);
2333 class_export_put(exp);
2334 lu_context_exit(&ctxt);
2335 lu_context_fini(&ctxt);
2340 static int mdt_obd_disconnect(struct obd_export *exp)
2342 unsigned long irqflags;
2347 class_export_get(exp);
2349 /* Disconnect early so that clients can't keep using export */
2350 rc = class_disconnect(exp);
2351 //ldlm_cancel_locks_for_export(exp);
2353 /* complete all outstanding replies */
2354 spin_lock_irqsave(&exp->exp_lock, irqflags);
2355 while (!list_empty(&exp->exp_outstanding_replies)) {
2356 struct ptlrpc_reply_state *rs =
2357 list_entry(exp->exp_outstanding_replies.next,
2358 struct ptlrpc_reply_state, rs_exp_list);
2359 struct ptlrpc_service *svc = rs->rs_service;
2361 spin_lock(&svc->srv_lock);
2362 list_del_init(&rs->rs_exp_list);
2363 ptlrpc_schedule_difficult_reply(rs);
2364 spin_unlock(&svc->srv_lock);
2366 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
2368 class_export_put(exp);
2372 /* FIXME: Can we avoid using these two interfaces? */
2373 static int mdt_init_export(struct obd_export *exp)
2375 struct mdt_export_data *med = &exp->exp_mdt_data;
2378 INIT_LIST_HEAD(&med->med_open_head);
2379 spin_lock_init(&med->med_open_lock);
2380 exp->exp_connecting = 1;
2384 static int mdt_destroy_export(struct obd_export *export)
2386 struct mdt_export_data *med;
2387 struct obd_device *obd = export->exp_obd;
2388 struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
2389 struct lu_context ctxt;
2393 med = &export->exp_mdt_data;
2394 target_destroy_export(export);
2396 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
2399 rc = lu_context_init(&ctxt);
2403 lu_context_enter(&ctxt);
2404 /* Close any open files (which may also cause orphan unlinking). */
2405 spin_lock(&med->med_open_lock);
2406 while (!list_empty(&med->med_open_head)) {
2407 struct list_head *tmp = med->med_open_head.next;
2408 struct mdt_file_data *mfd =
2409 list_entry(tmp, struct mdt_file_data, mfd_list);
2411 /* Remove mfd handle so it can't be found again.
2412 * We are consuming the mfd_list reference here. */
2413 class_handle_unhash(&mfd->mfd_handle);
2414 list_del_init(&mfd->mfd_list);
2415 spin_unlock(&med->med_open_lock);
2417 rc = mdt_mfd_close(&ctxt, mfd, 0);
2420 CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
2421 spin_lock(&med->med_open_lock);
2423 spin_unlock(&med->med_open_lock);
2424 mdt_client_free(&ctxt, mdt, med);
2426 lu_context_exit(&ctxt);
2427 lu_context_fini(&ctxt);
2432 static int mdt_notify(struct obd_device *obd, struct obd_device *watched,
2433 enum obd_notify_event ev, void *data)
2435 struct mdt_device *mdt;
2436 struct lu_device *next;
2437 struct lu_context ctxt;
2441 /*FIXME: allocation here may have some problems :( */
2442 rc = lu_context_init(&ctxt);
2446 mdt = mdt_dev(obd->obd_lu_dev);
2447 next = md2lu_dev(mdt->mdt_child);
2449 lu_context_enter(&ctxt);
2450 rc = next->ld_ops->ldo_notify(&ctxt, next, watched, ev, data);
2451 lu_context_exit(&ctxt);
2453 lu_context_fini(&ctxt);
2457 static struct obd_ops mdt_obd_device_ops = {
2458 .o_owner = THIS_MODULE,
2459 .o_connect = mdt_obd_connect,
2460 .o_disconnect = mdt_obd_disconnect,
2461 .o_init_export = mdt_init_export, /* By Huang Hua*/
2462 .o_destroy_export = mdt_destroy_export, /* By Huang Hua*/
2463 .o_notify = mdt_notify,
2466 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
2468 struct mdt_device *m = mdt_dev(d);
2474 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
2475 struct lu_device_type *t,
2476 struct lustre_cfg *cfg)
2478 struct lu_device *l;
2479 struct mdt_device *m;
2485 l = &m->mdt_md_dev.md_lu_dev;
2486 result = mdt_init0(ctx, m, t, cfg);
2489 l = ERR_PTR(result);
2492 l = ERR_PTR(-ENOMEM);
2497 * context key constructor/destructor
2500 static void *mdt_thread_init(const struct lu_context *ctx,
2501 struct lu_context_key *key)
2503 struct mdt_thread_info *info;
2506 * check that no high order allocations are incurred.
2508 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
2509 OBD_ALLOC_PTR(info);
2511 info->mti_ctxt = ctx;
2513 info = ERR_PTR(-ENOMEM);
2517 static void mdt_thread_fini(const struct lu_context *ctx,
2518 struct lu_context_key *key, void *data)
2520 struct mdt_thread_info *info = data;
2524 static struct lu_context_key mdt_thread_key = {
2525 .lct_init = mdt_thread_init,
2526 .lct_fini = mdt_thread_fini
2529 static int mdt_type_init(struct lu_device_type *t)
2531 return lu_context_key_register(&mdt_thread_key);
2534 static void mdt_type_fini(struct lu_device_type *t)
2536 lu_context_key_degister(&mdt_thread_key);
2539 static struct lu_device_type_operations mdt_device_type_ops = {
2540 .ldto_init = mdt_type_init,
2541 .ldto_fini = mdt_type_fini,
2543 .ldto_device_alloc = mdt_device_alloc,
2544 .ldto_device_free = mdt_device_free
2547 static struct lu_device_type mdt_device_type = {
2548 .ldt_tags = LU_DEVICE_MD,
2549 .ldt_name = LUSTRE_MDT0_NAME,
2550 .ldt_ops = &mdt_device_type_ops
2553 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
2557 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
2561 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
2563 static int __init mdt_mod_init(void)
2565 struct lprocfs_static_vars lvars;
2567 mdt_num_threads = MDT_NUM_THREADS;
2568 lprocfs_init_vars(mdt, &lvars);
2569 return class_register_type(&mdt_obd_device_ops, NULL,
2570 lvars.module_vars, LUSTRE_MDT0_NAME,
2574 static void __exit mdt_mod_exit(void)
2576 class_unregister_type(LUSTRE_MDT0_NAME);
2580 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt) \
2581 [prefix ## _ ## opc - prefix ## _ ## base] = { \
2583 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
2584 .mh_opc = prefix ## _ ## opc, \
2585 .mh_flags = flags, \
2590 #define DEF_MDT_HNDL(flags, name, fn, fmt) \
2591 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
2593 * Request with a format known in advance
2595 #define DEF_MDT_HNDL_F(flags, name, fn) \
2596 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
2598 * Request with a format we do not yet know
2600 #define DEF_MDT_HNDL_0(flags, name, fn) \
2601 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
2603 static struct mdt_handler mdt_mds_ops[] = {
2604 DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
2605 DEF_MDT_HNDL_F(0, DISCONNECT, mdt_disconnect),
2606 DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus),
2607 DEF_MDT_HNDL_F(HABEO_CORPUS, GETATTR, mdt_getattr),
2608 DEF_MDT_HNDL_F(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name),
2609 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, SETXATTR, mdt_setxattr),
2610 DEF_MDT_HNDL_F(HABEO_CORPUS, GETXATTR, mdt_getxattr),
2611 DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs),
2612 DEF_MDT_HNDL_0(HABEO_CORPUS, READPAGE, mdt_readpage),
2613 DEF_MDT_HNDL_F(0, REINT, mdt_reint),
2614 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, CLOSE, mdt_close),
2615 DEF_MDT_HNDL_0(0, DONE_WRITING, mdt_done_writing),
2616 DEF_MDT_HNDL_0(0, PIN, mdt_pin),
2617 DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
2618 DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_handle_quotacheck),
2619 DEF_MDT_HNDL_0(0, QUOTACTL, mdt_handle_quotactl)
2622 #define DEF_OBD_HNDL(flags, name, fn) \
2623 DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
2626 static struct mdt_handler mdt_obd_ops[] = {
2627 DEF_OBD_HNDL(0, PING, mdt_obd_ping),
2628 DEF_OBD_HNDL(0, LOG_CANCEL, mdt_obd_log_cancel),
2629 DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback)
2632 #define DEF_DLM_HNDL_0(flags, name, fn) \
2633 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
2634 #define DEF_DLM_HNDL_F(flags, name, fn) \
2635 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
2637 static struct mdt_handler mdt_dlm_ops[] = {
2638 DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
2639 DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT, mdt_convert),
2640 DEF_DLM_HNDL_0(0, BL_CALLBACK, mdt_bl_callback),
2641 DEF_DLM_HNDL_0(0, CP_CALLBACK, mdt_cp_callback)
2644 static struct mdt_handler mdt_llog_ops[] = {
2647 static struct mdt_opc_slice mdt_handlers[] = {
2649 .mos_opc_start = MDS_GETATTR,
2650 .mos_opc_end = MDS_LAST_OPC,
2651 .mos_hs = mdt_mds_ops
2654 .mos_opc_start = OBD_PING,
2655 .mos_opc_end = OBD_LAST_OPC,
2656 .mos_hs = mdt_obd_ops
2659 .mos_opc_start = LDLM_ENQUEUE,
2660 .mos_opc_end = LDLM_LAST_OPC,
2661 .mos_hs = mdt_dlm_ops
2664 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
2665 .mos_opc_end = LLOG_LAST_OPC,
2666 .mos_hs = mdt_llog_ops
2673 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2674 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT0_NAME")");
2675 MODULE_LICENSE("GPL");
2677 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
2678 "number of mdt service threads to start");
2680 cfs_module(mdt, "0.1.0", mdt_mod_init, mdt_mod_exit);