1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/mdt/mdt_handler.c
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
13 * Author: Huang Hua <huanghua@clusterfs.com>
15 * This file is part of the Lustre file system, http://www.lustre.org
16 * Lustre is a trademark of Cluster File Systems, Inc.
18 * You may have signed or agreed to another license before downloading
19 * this software. If so, you are bound by the terms and conditions
20 * of that agreement, and the following does not apply to you. See the
21 * LICENSE file included with this distribution for more information.
23 * If you did not agree to a different license, then this copy of Lustre
24 * is open source software; you can redistribute it and/or modify it
25 * under the terms of version 2 of the GNU General Public License as
26 * published by the Free Software Foundation.
28 * In either case, Lustre is distributed in the hope that it will be
29 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * license text for more details.
35 # define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_MDS
39 #include <linux/module.h>
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
44 * struct OBD_{ALLOC,FREE}*()
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include <lustre_mdt.h>
58 #include "mdt_internal.h"
59 #include <linux/lustre_acl.h>
61 * Initialized in mdt_mod_init().
63 unsigned long mdt_num_threads;
65 /* ptlrpc request handler for MDT. All handlers are
66 * grouped into several slices - struct mdt_opc_slice,
67 * and stored in an array - mdt_handlers[].
70 /* The name of this handler. */
72 /* Fail id for this handler, checked at the beginning of this handler*/
74 /* Operation code for this handler */
76 /* flags are listed in enum mdt_handler_flags below. */
78 /* The actual handler function to execute. */
79 int (*mh_act)(struct mdt_thread_info *info);
80 /* Request format for this request. */
81 const struct req_format *mh_fmt;
84 enum mdt_handler_flags {
86 * struct mdt_body is passed in the incoming message, and object
87 * identified by this fid exists on disk.
89 * "habeo corpus" == "I have a body"
91 HABEO_CORPUS = (1 << 0),
93 * struct ldlm_request is passed in the incoming message.
95 * "habeo clavis" == "I have a key"
97 HABEO_CLAVIS = (1 << 1),
99 * this request has fixed reply format, so that reply message can be
100 * packed by generic code.
102 * "habeo refero" == "I have a reply"
104 HABEO_REFERO = (1 << 2),
106 * this request will modify something, so check whether the filesystem
107 * is readonly or not, then return -EROFS to client asap if necessary.
109 * "mutabor" == "I shall modify"
114 struct mdt_opc_slice {
117 struct mdt_handler *mos_hs;
120 static struct mdt_opc_slice mdt_regular_handlers[];
121 static struct mdt_opc_slice mdt_readpage_handlers[];
122 static struct mdt_opc_slice mdt_seq_handlers[];
123 static struct mdt_opc_slice mdt_fld_handlers[];
125 static struct mdt_device *mdt_dev(struct lu_device *d);
126 static int mdt_regular_handle(struct ptlrpc_request *req);
127 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
129 static struct lu_object_operations mdt_obj_ops;
131 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
135 return (rep->lock_policy_res1 & flag);
138 void mdt_clear_disposition(struct mdt_thread_info *info,
139 struct ldlm_reply *rep, int flag)
142 info->mti_opdata &= ~flag;
144 rep->lock_policy_res1 &= ~flag;
147 void mdt_set_disposition(struct mdt_thread_info *info,
148 struct ldlm_reply *rep, int flag)
151 info->mti_opdata |= flag;
153 rep->lock_policy_res1 |= flag;
156 static int mdt_is_remote_object(struct mdt_object *o)
158 return (o->mot_header.loh_attr & LOHA_REMOTE);
162 static int mdt_getstatus(struct mdt_thread_info *info)
164 struct md_device *next = info->mti_mdt->mdt_child;
166 struct mdt_body *body;
170 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
173 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
174 rc = next->md_ops->mdo_root_get(info->mti_ctxt,
177 body->valid |= OBD_MD_FLID;
183 static int mdt_statfs(struct mdt_thread_info *info)
185 struct md_device *next = info->mti_mdt->mdt_child;
186 struct obd_statfs *osfs;
191 /* This will trigger a watchdog timeout */
192 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
193 (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
196 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
199 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
200 /* XXX max_age optimisation is needed here. See mds_statfs */
201 rc = next->md_ops->mdo_statfs(info->mti_ctxt,
202 next, &info->mti_u.ksfs);
203 statfs_pack(osfs, &info->mti_u.ksfs);
209 void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr,
210 struct mdt_object *o)
212 /* Check if Size-on-MDS is enabled. */
213 if (S_ISREG(attr->la_mode) && mdt_sizeonmds_enabled(o)) {
214 b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
215 b->size = attr->la_size;
216 b->blocks = attr->la_blocks;
220 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
221 const struct lu_fid *fid)
223 /*XXX should pack the reply body according to lu_valid*/
224 b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
225 OBD_MD_FLGID | OBD_MD_FLTYPE |
226 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
227 OBD_MD_FLATIME | OBD_MD_FLMTIME ;
229 if (!S_ISREG(attr->la_mode))
230 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
232 b->atime = attr->la_atime;
233 b->mtime = attr->la_mtime;
234 b->ctime = attr->la_ctime;
235 b->mode = attr->la_mode;
236 b->size = attr->la_size;
237 b->blocks = attr->la_blocks;
238 b->uid = attr->la_uid;
239 b->gid = attr->la_gid;
240 b->flags = attr->la_flags;
241 b->nlink = attr->la_nlink;
242 b->rdev = attr->la_rdev;
246 b->valid |= OBD_MD_FLID;
247 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
248 PFID(fid), b->nlink, b->mode, b->size);
252 static inline int mdt_body_has_lov(const struct lu_attr *la,
253 const struct mdt_body *body)
255 return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
256 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
259 static int mdt_getattr_internal(struct mdt_thread_info *info,
260 struct mdt_object *o)
262 struct md_object *next = mdt_object_child(o);
263 const struct mdt_body *reqbody = info->mti_body;
264 struct ptlrpc_request *req = mdt_info_req(info);
265 struct md_attr *ma = &info->mti_attr;
266 struct lu_attr *la = &ma->ma_attr;
267 struct req_capsule *pill = &info->mti_pill;
268 const struct lu_context *ctxt = info->mti_ctxt;
269 struct mdt_body *repbody;
275 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
278 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
279 repbody->eadatasize = 0;
280 repbody->aclsize = 0;
282 if (reqbody->valid & OBD_MD_MEA) {
283 /* Assumption: MDT_MD size is enough for lmv size FIXME */
284 ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
285 ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD,
287 ma->ma_need = MA_INODE | MA_LMV;
289 ma->ma_need = MA_INODE | MA_LOV ;
290 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
291 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
294 rc = mo_attr_get(ctxt, next, ma);
295 if (rc == -EREMOTE) {
296 /* This object is located on remote node.*/
297 repbody->fid1 = *mdt_object_fid(o);
298 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
301 CERROR("getattr error for "DFID": %d\n",
302 PFID(mdt_object_fid(o)), rc);
306 if (ma->ma_valid & MA_INODE)
307 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
311 if (mdt_body_has_lov(la, reqbody)) {
312 if (ma->ma_valid & MA_LOV) {
313 LASSERT(ma->ma_lmm_size);
314 mdt_dump_lmm(D_INFO, ma->ma_lmm);
315 repbody->eadatasize = ma->ma_lmm_size;
316 if (S_ISDIR(la->la_mode))
317 repbody->valid |= OBD_MD_FLDIREA;
319 repbody->valid |= OBD_MD_FLEASIZE;
321 if (ma->ma_valid & MA_LMV) {
322 LASSERT(S_ISDIR(la->la_mode));
323 repbody->eadatasize = ma->ma_lmv_size;
324 repbody->valid |= OBD_MD_FLDIREA;
325 repbody->valid |= OBD_MD_MEA;
327 } else if (S_ISLNK(la->la_mode) &&
328 reqbody->valid & OBD_MD_LINKNAME) {
329 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
331 CERROR("readlink failed: %d\n", rc);
334 repbody->valid |= OBD_MD_LINKNAME;
335 repbody->eadatasize = rc + 1;
336 ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
337 CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
338 (char*)ma->ma_lmm, rc);
343 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
344 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
345 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
346 repbody->valid |= OBD_MD_FLMODEASIZE;
347 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
348 "MAX_COOKIE to : %d:%d\n",
350 repbody->max_cookiesize);
353 #ifdef CONFIG_FS_POSIX_ACL
354 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
355 (reqbody->valid & OBD_MD_FLACL)) {
356 buffer = req_capsule_server_get(pill, &RMF_ACL);
357 length = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
359 rc = mo_xattr_get(ctxt, next, buffer,
360 length, XATTR_NAME_ACL_ACCESS);
362 if (rc == -ENODATA || rc == -EOPNOTSUPP)
365 CERROR("got acl size: %d\n", rc);
367 repbody->aclsize = rc;
368 repbody->valid |= OBD_MD_FLACL;
377 static int mdt_getattr(struct mdt_thread_info *info)
380 struct mdt_object *obj;
382 obj = info->mti_object;
383 LASSERT(obj != NULL);
384 LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
387 rc = mdt_getattr_internal(info, obj);
388 mdt_shrink_reply(info, REPLY_REC_OFF + 1);
392 static int mdt_is_subdir(struct mdt_thread_info *info)
394 struct mdt_object *obj = info->mti_object;
395 struct req_capsule *pill = &info->mti_pill;
396 struct mdt_body *repbody;
399 obj = info->mti_object;
400 LASSERT(obj != NULL);
401 LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
404 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
407 * We save last checked parent fid to @repbody->fid1 for remote
410 rc = mdo_is_subdir(info->mti_ctxt, mdt_object_child(obj),
411 &info->mti_tmp_fid2, &repbody->fid1);
416 * Save error code to ->mode. Later it it is used for detecting the case
420 repbody->valid = OBD_MD_FLMODE;
423 repbody->valid |= OBD_MD_FLID;
430 * UPDATE lock should be taken against parent, and be release before exit;
431 * child_bits lock should be taken against child, and be returned back:
432 * (1)normal request should release the child lock;
433 * (2)intent request will grant the lock to client.
435 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
436 struct mdt_lock_handle *lhc,
438 struct ldlm_reply *ldlm_rep)
440 struct ptlrpc_request *req = mdt_info_req(info);
441 struct mdt_object *parent = info->mti_object;
442 struct mdt_object *child;
443 struct md_object *next = mdt_object_child(info->mti_object);
444 struct lu_fid *child_fid = &info->mti_tmp_fid1;
447 struct mdt_lock_handle *lhp;
448 struct ldlm_lock *lock;
451 is_resent = lustre_handle_is_used(&lhc->mlh_lh);
453 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
455 LASSERT(info->mti_object != NULL);
456 name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
460 CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
461 PFID(mdt_object_fid(parent)), name, ldlm_rep);
463 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
464 if (strlen(name) == 0) {
465 /* only getattr on the child. parent is on another node. */
466 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
468 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
469 ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
472 /* Do not take lock for resent case. */
473 lock = ldlm_handle2lock(&lhc->mlh_lh);
475 CERROR("Invalid lock handle "LPX64"\n",
479 LASSERT(fid_res_name_eq(mdt_object_fid(child),
480 &lock->l_resource->lr_name));
484 mdt_lock_handle_init(lhc);
485 lhc->mlh_mode = LCK_CR;
488 * Object's name is on another MDS, no lookup lock is
489 * needed here but update is.
491 child_bits &= ~MDS_INODELOCK_LOOKUP;
492 child_bits |= MDS_INODELOCK_UPDATE;
493 rc = mdt_object_lock(info, child, lhc, child_bits);
496 /* Finally, we can get attr for child. */
497 rc = mdt_getattr_internal(info, child);
499 mdt_object_unlock(info, child, lhc, 1);
504 /*step 1: lock parent */
505 lhp = &info->mti_lh[MDT_LH_PARENT];
506 lhp->mlh_mode = LCK_CR;
507 rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
511 /*step 2: lookup child's fid by name */
512 rc = mdo_lookup(info->mti_ctxt, next, name, child_fid);
515 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
516 GOTO(out_parent, rc);
518 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
520 *step 3: find the child object by fid & lock it.
521 * regardless if it is local or remote.
523 child = mdt_object_find(info->mti_ctxt, info->mti_mdt, child_fid);
525 GOTO(out_parent, rc = PTR_ERR(child));
527 /* Do not take lock for resent case. */
528 lock = ldlm_handle2lock(&lhc->mlh_lh);
530 CERROR("Invalid lock handle "LPX64"\n",
534 LASSERT(fid_res_name_eq(child_fid,
535 &lock->l_resource->lr_name));
538 mdt_lock_handle_init(lhc);
539 lhc->mlh_mode = LCK_CR;
540 rc = mdt_object_cr_lock(info, child, lhc, child_bits);
545 /* finally, we can get attr for child. */
546 rc = mdt_getattr_internal(info, child);
548 mdt_object_unlock(info, child, lhc, 1);
550 struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
552 struct ldlm_res_id *res_id;
553 struct mdt_body *repbody;
556 /* Debugging code. */
557 res_id = &lock->l_resource->lr_name;
558 LDLM_DEBUG(lock, "we will return this lock client\n");
559 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
560 &lock->l_resource->lr_name),
561 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
562 (unsigned long)res_id->name[0],
563 (unsigned long)res_id->name[1],
564 (unsigned long)res_id->name[2],
565 PFID(mdt_object_fid(child)));
567 /* Pack Size-on-MDS inode attributes to the body if
568 * update lock is given. */
569 repbody = req_capsule_server_get(&info->mti_pill,
571 ma = &info->mti_attr.ma_attr;
572 if (lock->l_policy_data.l_inodebits.bits &
573 MDS_INODELOCK_UPDATE)
574 mdt_pack_size2body(repbody, ma, child);
582 mdt_object_put(info->mti_ctxt, child);
584 mdt_object_unlock(info, parent, lhp, 1);
589 /* normal handler: should release the child lock */
590 static int mdt_getattr_name(struct mdt_thread_info *info)
592 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
597 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
598 if (lustre_handle_is_used(&lhc->mlh_lh)) {
599 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
600 lhc->mlh_lh.cookie = 0;
602 mdt_shrink_reply(info, REPLY_REC_OFF + 1);
606 static struct lu_device_operations mdt_lu_ops;
608 static int lu_device_is_mdt(struct lu_device *d)
610 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
613 static inline struct mdt_device *mdt_dev(struct lu_device *d)
615 LASSERT(lu_device_is_mdt(d));
616 return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
619 static int mdt_connect(struct mdt_thread_info *info)
622 struct ptlrpc_request *req;
624 req = mdt_info_req(info);
625 rc = target_handle_connect(req, mdt_regular_handle);
627 LASSERT(req->rq_export != NULL);
628 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
633 static int mdt_disconnect(struct mdt_thread_info *info)
635 return target_handle_disconnect(mdt_info_req(info));
638 static int mdt_sendpage(struct mdt_thread_info *info,
639 struct lu_rdpg *rdpg)
641 struct ptlrpc_request *req = mdt_info_req(info);
642 struct ptlrpc_bulk_desc *desc;
643 struct l_wait_info *lwi = &info->mti_u.rdpg.mti_wait_info;
650 desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
653 GOTO(out, rc = -ENOMEM);
655 for (i = 0, tmpcount = rdpg->rp_count;
656 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
657 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
658 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
661 LASSERT(desc->bd_nob == rdpg->rp_count);
662 rc = ptlrpc_start_bulk_transfer(desc);
666 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
667 GOTO(abort_bulk, rc);
669 *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
670 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
671 LASSERT (rc == 0 || rc == -ETIMEDOUT);
674 if (desc->bd_success &&
675 desc->bd_nob_transferred == rdpg->rp_count)
678 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
681 DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
682 (rc == -ETIMEDOUT) ? "timeout" : "network error",
683 desc->bd_nob_transferred, rdpg->rp_count,
684 req->rq_export->exp_client_uuid.uuid,
685 req->rq_export->exp_connection->c_remote_uuid.uuid);
687 class_fail_export(req->rq_export);
691 ptlrpc_abort_bulk(desc);
693 ptlrpc_free_bulk(desc);
698 #ifdef HAVE_SPLIT_SUPPORT
700 * Retrieve dir entry from the page and insert it to the
701 * slave object, actually, this should be in osd layer,
702 * but since it will not in the final product, so just do
703 * it here and do not define more moo api anymore for
706 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
708 struct mdt_object *object = info->mti_object;
709 struct lu_dirpage *dp;
710 struct lu_dirent *ent;
714 /* Disable trans for this name insert, since it will
715 * include many trans for this */
716 info->mti_no_need_trans = 1;
718 dp = page_address(page);
719 for (ent = lu_dirent_start(dp); ent != NULL;
720 ent = lu_dirent_next(ent)) {
721 struct lu_fid *lf = &ent->lde_fid;
723 /* FIXME: multi-trans for this name insert */
724 if (strncmp(ent->lde_name, ".", ent->lde_namelen) &&
725 strncmp(ent->lde_name, "..", ent->lde_namelen)) {
727 /* FIXME: Here we allocate name for each name,
728 * maybe stupid, but can not find better way.
729 * will find better way */
730 OBD_ALLOC(name, ent->lde_namelen + 1);
731 memcpy(name, ent->lde_name, ent->lde_namelen);
732 rc = mdo_name_insert(info->mti_ctxt,
733 md_object_next(&object->mot_obj),
735 OBD_FREE(name, ent->lde_namelen + 1);
745 static int mdt_bulk_timeout(void *data)
749 CERROR("mdt bulk transfer timeout \n");
754 static int mdt_writepage(struct mdt_thread_info *info)
756 struct ptlrpc_request *req = mdt_info_req(info);
757 struct mdt_body *reqbody;
758 struct l_wait_info *lwi;
759 struct ptlrpc_bulk_desc *desc;
765 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
769 desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
773 /* allocate the page for the desc */
774 page = alloc_pages(GFP_KERNEL, 0);
776 GOTO(desc_cleanup, rc = -ENOMEM);
778 CDEBUG(D_INFO, "Received page offset %d size %d \n",
779 (int)reqbody->size, (int)reqbody->nlink);
781 ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
782 (int)reqbody->nlink);
784 /* FIXME: following parts are copied from ost_brw_write */
786 /* Check if client was evicted while we were doing i/o before touching
790 GOTO(cleanup_page, rc = -ENOMEM);
792 if (desc->bd_export->exp_failed)
795 rc = ptlrpc_start_bulk_transfer (desc);
797 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
798 mdt_bulk_timeout, desc);
799 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
800 desc->bd_export->exp_failed, lwi);
801 LASSERT(rc == 0 || rc == -ETIMEDOUT);
802 if (rc == -ETIMEDOUT) {
803 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
804 ptlrpc_abort_bulk(desc);
805 } else if (desc->bd_export->exp_failed) {
806 DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
808 ptlrpc_abort_bulk(desc);
809 } else if (!desc->bd_success ||
810 desc->bd_nob_transferred != desc->bd_nob) {
811 DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
813 "truncated" : "network error on",
814 desc->bd_nob_transferred, desc->bd_nob);
815 /* XXX should this be a different errno? */
819 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
822 GOTO(cleanup_lwi, rc);
823 rc = mdt_write_dir_page(info, page);
828 __free_pages(page, 0);
830 ptlrpc_free_bulk(desc);
835 static int mdt_readpage(struct mdt_thread_info *info)
837 struct mdt_object *object = info->mti_object;
838 struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
839 struct mdt_body *reqbody;
840 struct mdt_body *repbody;
845 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
848 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
849 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
850 if (reqbody == NULL || repbody == NULL)
854 * prepare @rdpg before calling lower layers and transfer itself. Here
855 * reqbody->size contains offset of where to start to read and
856 * reqbody->nlink contains number bytes to read.
858 rdpg->rp_hash = reqbody->size;
859 if ((__u64)rdpg->rp_hash != reqbody->size) {
860 CERROR("Invalid hash: %#llx != %#llx\n",
861 (__u64)rdpg->rp_hash, reqbody->size);
864 rdpg->rp_count = reqbody->nlink;
865 rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
866 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
867 if (rdpg->rp_pages == NULL)
870 for (i = 0; i < rdpg->rp_npages; ++i) {
871 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
872 if (rdpg->rp_pages[i] == NULL)
873 GOTO(free_rdpg, rc = -ENOMEM);
876 /* call lower layers to fill allocated pages with directory data */
877 rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
885 /* send pages to client */
886 rc = mdt_sendpage(info, rdpg);
891 for (i = 0; i < rdpg->rp_npages; i++)
892 if (rdpg->rp_pages[i] != NULL)
893 __free_pages(rdpg->rp_pages[i], 0);
894 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
896 MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
898 return rc ? rc : rc1;
901 static int mdt_reint_internal(struct mdt_thread_info *info,
902 struct mdt_lock_handle *lhc,
905 struct req_capsule *pill = &info->mti_pill;
906 struct mdt_device *mdt = info->mti_mdt;
907 struct ptlrpc_request *req = mdt_info_req(info);
911 rc = mdt_reint_unpack(info, op);
913 CERROR("Can't unpack reint, rc %d\n", rc);
918 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
919 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
920 mdt->mdt_max_mdsize);
921 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
922 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
923 mdt->mdt_max_cookiesize);
924 rc = req_capsule_pack(pill);
926 CERROR("Can't pack response, rc %d\n", rc);
931 * Check this after packing response, because after we fail here without
932 * allocating response, caller anyway may want to get ldlm_reply from it
935 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
938 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
939 struct mdt_client_data *mcd;
941 mcd = req->rq_export->exp_mdt_data.med_mcd;
942 if (mcd->mcd_last_xid == req->rq_xid) {
943 mdt_reconstruct(info, lhc);
944 RETURN(lustre_msg_get_status(req->rq_repmsg));
946 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
949 rc = mdt_reint_rec(info, lhc);
954 static long mdt_reint_opcode(struct mdt_thread_info *info,
955 const struct req_format **fmt)
961 ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
964 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
965 if (opc < REINT_MAX && fmt[opc] != NULL)
966 req_capsule_extend(&info->mti_pill, fmt[opc]);
968 CERROR("Unsupported opc: %ld\n", opc);
973 static int mdt_reint(struct mdt_thread_info *info)
978 static const struct req_format *reint_fmts[REINT_MAX] = {
979 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
980 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
981 [REINT_LINK] = &RQF_MDS_REINT_LINK,
982 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
983 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
984 [REINT_OPEN] = &RQF_MDS_REINT_OPEN
989 opc = mdt_reint_opcode(info, reint_fmts);
992 * No lock possible here from client to pass it to reint code
995 rc = mdt_reint_internal(info, NULL, opc);
999 info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
1003 /* TODO these two methods not available now. */
1005 /* this should sync the whole device */
1006 static int mdt_device_sync(struct mdt_thread_info *info)
1011 /* this should sync this object */
1012 static int mdt_object_sync(struct mdt_thread_info *info)
1017 static int mdt_sync(struct mdt_thread_info *info)
1019 struct req_capsule *pill = &info->mti_pill;
1020 struct mdt_body *body;
1024 /* The fid may be zero, so we req_capsule_set manually */
1025 req_capsule_set(pill, &RQF_MDS_SYNC);
1027 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1031 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1034 if (fid_seq(&body->fid1) == 0) {
1035 /* sync the whole device */
1036 rc = req_capsule_pack(pill);
1038 rc = mdt_device_sync(info);
1040 /* sync an object */
1041 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
1043 rc = mdt_object_sync(info);
1045 struct md_object *next;
1046 const struct lu_fid *fid;
1047 struct lu_attr *la = &info->mti_attr.ma_attr;
1049 next = mdt_object_child(info->mti_object);
1050 info->mti_attr.ma_need = MA_INODE;
1051 rc = mo_attr_get(info->mti_ctxt, next,
1054 body = req_capsule_server_get(pill,
1056 fid = mdt_object_fid(info->mti_object);
1057 mdt_pack_attr2body(body, la, fid);
1065 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
1070 static int mdt_quotactl_handle(struct mdt_thread_info *info)
1076 * OBD PING and other handlers.
1078 static int mdt_obd_ping(struct mdt_thread_info *info)
1082 rc = target_handle_ping(mdt_info_req(info));
1086 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
1091 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
1101 static struct ldlm_callback_suite cbs = {
1102 .lcs_completion = ldlm_server_completion_ast,
1103 .lcs_blocking = ldlm_server_blocking_ast,
1107 static int mdt_enqueue(struct mdt_thread_info *info)
1110 struct ptlrpc_request *req;
1113 * info->mti_dlm_req already contains swapped and (if necessary)
1114 * converted dlm request.
1116 LASSERT(info->mti_dlm_req != NULL);
1118 req = mdt_info_req(info);
1119 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
1120 rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
1121 req, info->mti_dlm_req, &cbs);
1122 return rc ? : req->rq_status;
1125 static int mdt_convert(struct mdt_thread_info *info)
1128 struct ptlrpc_request *req;
1130 LASSERT(info->mti_dlm_req);
1131 req = mdt_info_req(info);
1132 rc = ldlm_handle_convert0(req, info->mti_dlm_req);
1133 return rc ? : req->rq_status;
1136 static int mdt_bl_callback(struct mdt_thread_info *info)
1138 CERROR("bl callbacks should not happen on MDS\n");
1143 static int mdt_cp_callback(struct mdt_thread_info *info)
1145 CERROR("cp callbacks should not happen on MDS\n");
1151 * sec context handlers
1153 static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
1158 static struct mdt_object *mdt_obj(struct lu_object *o)
1160 LASSERT(lu_device_is_mdt(o->lo_dev));
1161 return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1164 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
1165 struct mdt_device *d,
1166 const struct lu_fid *f)
1168 struct lu_object *o;
1169 struct mdt_object *m;
1172 o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
1174 m = (struct mdt_object *)o;
1180 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1181 struct mdt_lock_handle *lh, __u64 ibits)
1183 ldlm_policy_data_t *policy = &info->mti_policy;
1184 struct ldlm_res_id *res_id = &info->mti_res_id;
1185 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1189 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1190 LASSERT(lh->mlh_mode != LCK_MINMODE);
1191 if (lu_object_exists(&o->mot_obj.mo_lu) < 0) {
1192 LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
1193 LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1195 policy->l_inodebits.bits = ibits;
1197 rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
1202 /* lock with cross-ref fixes */
1203 int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o,
1204 struct mdt_lock_handle *lh, __u64 ibits)
1206 if (lu_object_exists(&o->mot_obj.mo_lu) < 0) {
1207 /* cross-ref object fix */
1208 ibits &= ~MDS_INODELOCK_UPDATE;
1209 ibits |= MDS_INODELOCK_LOOKUP;
1211 return mdt_object_lock(info, o, lh, ibits);
1215 * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
1216 * to save this lock in req. when transaction committed, req will be released,
1217 * and lock will, too.
1219 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1220 struct mdt_lock_handle *lh, int decref)
1222 struct ptlrpc_request *req = mdt_info_req(info);
1223 struct lustre_handle *handle = &lh->mlh_lh;
1224 ldlm_mode_t mode = lh->mlh_mode;
1227 if (lustre_handle_is_used(handle)) {
1229 fid_unlock(mdt_object_fid(o), handle, mode);
1231 ptlrpc_save_lock(req, handle, mode);
1237 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1238 const struct lu_fid *f,
1239 struct mdt_lock_handle *lh,
1242 struct mdt_object *o;
1244 o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
1248 rc = mdt_object_lock(info, o, lh, ibits);
1250 mdt_object_put(info->mti_ctxt, o);
1257 void mdt_object_unlock_put(struct mdt_thread_info * info,
1258 struct mdt_object * o,
1259 struct mdt_lock_handle *lh,
1262 mdt_object_unlock(info, o, lh, decref);
1263 mdt_object_put(info->mti_ctxt, o);
1266 static struct mdt_handler *mdt_handler_find(__u32 opc,
1267 struct mdt_opc_slice *supported)
1269 struct mdt_opc_slice *s;
1270 struct mdt_handler *h;
1273 for (s = supported; s->mos_hs != NULL; s++) {
1274 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1275 h = s->mos_hs + (opc - s->mos_opc_start);
1277 LASSERT(h->mh_opc == opc);
1279 h = NULL; /* unsupported opc */
1286 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1288 return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1291 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1293 return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1296 static int mdt_lock_resname_compat(struct mdt_device *m,
1297 struct ldlm_request *req)
1299 /* XXX something... later. */
1303 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1305 /* XXX something... later. */
1310 * Generic code handling requests that have struct mdt_body passed in:
1312 * - extract mdt_body from request and save it in @info, if present;
1314 * - create lu_object, corresponding to the fid in mdt_body, and save it in
1317 * - if HABEO_CORPUS flag is set for this request type check whether object
1318 * actually exists on storage (lu_object_exists()).
1321 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1323 const struct mdt_body *body;
1324 struct mdt_object *obj;
1325 const struct lu_context *ctx;
1326 struct req_capsule *pill;
1329 ctx = info->mti_ctxt;
1330 pill = &info->mti_pill;
1332 body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1334 if (fid_is_sane(&body->fid1)) {
1335 obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
1337 if ((flags & HABEO_CORPUS) &&
1338 !lu_object_exists(&obj->mot_obj.mo_lu)) {
1339 mdt_object_put(ctx, obj);
1342 info->mti_object = obj;
1348 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1356 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1358 struct req_capsule *pill;
1362 pill = &info->mti_pill;
1364 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1365 rc = mdt_body_unpack(info, flags);
1369 if (rc == 0 && (flags & HABEO_REFERO)) {
1370 struct mdt_device *mdt = info->mti_mdt;
1372 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1373 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1374 mdt->mdt_max_mdsize);
1375 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1376 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1377 mdt->mdt_max_cookiesize);
1379 rc = req_capsule_pack(pill);
1385 struct lu_context_key mdt_txn_key;
1386 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1388 struct mdt_device *mdt = info->mti_mdt;
1389 struct ptlrpc_request *req = mdt_info_req(info);
1390 struct obd_export *exp = req->rq_export;
1392 /* sometimes the reply message has not been successfully packed */
1393 if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1396 if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1399 /*XXX: assert on this when all code will be finished */
1400 if (rc != 0 && info->mti_transno != 0) {
1401 info->mti_transno = 0;
1402 CERROR("Transno is not 0 while rc is %i!\n", rc);
1405 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1406 info->mti_transno, exp->exp_obd->obd_last_committed);
1408 spin_lock(&mdt->mdt_transno_lock);
1409 req->rq_transno = info->mti_transno;
1410 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1412 target_committed_to_req(req);
1414 spin_unlock(&mdt->mdt_transno_lock);
1415 lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1416 //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1422 * Invoke handler for this request opc. Also do necessary preprocessing
1423 * (according to handler ->mh_flags), and post-processing (setting of
1424 * ->last_{xid,committed}).
1426 static int mdt_req_handle(struct mdt_thread_info *info,
1427 struct mdt_handler *h, struct ptlrpc_request *req)
1434 LASSERT(h->mh_act != NULL);
1435 LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1436 LASSERT(current->journal_info == NULL);
1438 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1441 * Do not use *_FAIL_CHECK_ONCE() macros, because they will stop
1442 * correct handling of failed req later in ldlm due to doing
1443 * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
1444 * correct actions like it is done in target_send_reply_msg().
1446 if (h->mh_fail_id != 0) {
1448 * Set to info->mti_fail_id to handler fail_id, it will be used
1449 * later, and better than use default fail_id.
1451 info->mti_fail_id = h->mh_fail_id;
1452 if (OBD_FAIL_CHECK(h->mh_fail_id))
1457 flags = h->mh_flags;
1458 LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1460 if (h->mh_fmt != NULL) {
1461 req_capsule_set(&info->mti_pill, h->mh_fmt);
1462 rc = mdt_unpack_req_pack_rep(info, flags);
1465 if (rc == 0 && flags & MUTABOR &&
1466 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1469 if (rc == 0 && flags & HABEO_CLAVIS) {
1470 struct ldlm_request *dlm_req;
1472 LASSERT(h->mh_fmt != NULL);
1474 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1475 if (dlm_req != NULL) {
1476 if (info->mti_mdt->mdt_opts.mo_compat_resname)
1477 rc = mdt_lock_resname_compat(info->mti_mdt,
1479 info->mti_dlm_req = dlm_req;
1481 CERROR("Can't unpack dlm request\n");
1490 rc = h->mh_act(info);
1492 * XXX result value is unconditionally shoved into ->rq_status (original
1493 * code sometimes placed error code into ->rq_status, and sometimes
1494 * returned it to the caller). ptlrpc_server_handle_request() doesn't
1495 * check return value anyway.
1497 req->rq_status = rc;
1499 LASSERT(current->journal_info == NULL);
1501 if (flags & HABEO_CLAVIS && info->mti_mdt->mdt_opts.mo_compat_resname) {
1502 struct ldlm_reply *dlmrep;
1504 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1506 rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1509 /* If we're DISCONNECTing, the mdt_export_data is already freed */
1511 if (rc == 0 && h->mh_opc != MDS_DISCONNECT) {
1512 target_committed_to_req(req);
1517 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1519 lh->mlh_lh.cookie = 0ull;
1520 lh->mlh_mode = LCK_MINMODE;
1523 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1525 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1528 static void mdt_thread_info_init(struct ptlrpc_request *req,
1529 struct mdt_thread_info *info)
1533 memset(info, 0, sizeof(*info));
1535 info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1536 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1537 info->mti_rep_buf_size[i] = -1;
1539 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1540 mdt_lock_handle_init(&info->mti_lh[i]);
1542 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1543 info->mti_ctxt = req->rq_svc_thread->t_ctx;
1544 info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1545 /* it can be NULL while CONNECT */
1547 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1548 req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1549 info->mti_rep_buf_size);
1552 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1556 req_capsule_fini(&info->mti_pill);
1557 if (info->mti_object != NULL) {
1558 mdt_object_put(info->mti_ctxt, info->mti_object);
1559 info->mti_object = NULL;
1561 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1562 mdt_lock_handle_fini(&info->mti_lh[i]);
1566 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1567 struct obd_device *obd, int *process);
1569 * Handle recovery. Return:
1570 * +1: continue request processing;
1571 * -ve: abort immediately with the given error code;
1572 * 0: send reply with error code in req->rq_status;
1574 static int mdt_recovery(struct ptlrpc_request *req)
1578 struct obd_device *obd;
1582 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1585 case SEC_CTX_INIT_CONT:
1590 if (req->rq_export == NULL) {
1591 CERROR("operation %d on unconnected MDS from %s\n",
1592 lustre_msg_get_opc(req->rq_reqmsg),
1593 libcfs_id2str(req->rq_peer));
1594 req->rq_status = -ENOTCONN;
1598 /* sanity check: if the xid matches, the request must be marked as a
1599 * resent or replayed */
1600 if (req->rq_xid == req_exp_last_xid(req) ||
1601 req->rq_xid == req_exp_last_close_xid(req)) {
1602 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1603 (MSG_RESENT | MSG_REPLAY))) {
1604 CERROR("rq_xid "LPU64" matches last_xid, "
1605 "expected RESENT flag\n", req->rq_xid);
1606 req->rq_status = -ENOTCONN;
1611 /* else: note the opposite is not always true; a RESENT req after a
1612 * failover will usually not match the last_xid, since it was likely
1613 * never committed. A REPLAYed request will almost never match the
1614 * last xid, however it could for a committed, but still retained,
1617 obd = req->rq_export->exp_obd;
1619 /* Check for aborted recovery... */
1620 spin_lock_bh(&obd->obd_processing_task_lock);
1621 abort_recovery = obd->obd_abort_recovery;
1622 recovering = obd->obd_recovering;
1623 spin_unlock_bh(&obd->obd_processing_task_lock);
1624 if (abort_recovery) {
1625 target_abort_recovery(obd);
1626 } else if (recovering) {
1630 rc = mds_filter_recovery_request(req, obd, &should_process);
1631 if (rc != 0 || !should_process) {
1638 static int mdt_reply(struct ptlrpc_request *req, int rc,
1639 struct mdt_thread_info *info)
1641 struct obd_device *obd;
1644 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1645 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1646 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1648 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1649 if (obd && obd->obd_recovering) {
1650 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1651 RETURN(target_queue_final_reply(req, rc));
1653 /* Lost a race with recovery; let the error path
1655 rc = req->rq_status = -ENOTCONN;
1658 target_send_reply(req, rc, info->mti_fail_id);
1663 extern int mds_msg_check_version(struct lustre_msg *msg);
1665 static int mdt_handle0(struct ptlrpc_request *req,
1666 struct mdt_thread_info *info,
1667 struct mdt_opc_slice *supported)
1669 struct mdt_handler *h;
1670 struct lustre_msg *msg;
1675 MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1677 LASSERT(current->journal_info == NULL);
1679 msg = req->rq_reqmsg;
1680 rc = mds_msg_check_version(msg);
1682 rc = mdt_recovery(req);
1685 h = mdt_handler_find(lustre_msg_get_opc(msg),
1688 rc = mdt_req_handle(info, h, req);
1690 req->rq_status = -ENOTSUPP;
1691 rc = ptlrpc_error(req);
1696 rc = mdt_reply(req, rc, info);
1699 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
1704 * MDT handler function called by ptlrpc service thread when request comes.
1706 * XXX common "target" functionality should be factored into separate module
1707 * shared by mdt, ost and stand-alone services like fld.
1709 static int mdt_handle_common(struct ptlrpc_request *req,
1710 struct mdt_opc_slice *supported)
1712 struct lu_context *ctx;
1713 struct mdt_thread_info *info;
1717 ctx = req->rq_svc_thread->t_ctx;
1718 LASSERT(ctx != NULL);
1719 LASSERT(ctx->lc_thread == req->rq_svc_thread);
1720 info = lu_context_key_get(ctx, &mdt_thread_key);
1721 LASSERT(info != NULL);
1723 mdt_thread_info_init(req, info);
1725 rc = mdt_handle0(req, info, supported);
1727 mdt_thread_info_fini(info);
1731 static int mdt_regular_handle(struct ptlrpc_request *req)
1733 return mdt_handle_common(req, mdt_regular_handlers);
1736 static int mdt_readpage_handle(struct ptlrpc_request *req)
1738 return mdt_handle_common(req, mdt_readpage_handlers);
1741 static int mdt_mdsc_handle(struct ptlrpc_request *req)
1743 return mdt_handle_common(req, mdt_seq_handlers);
1746 static int mdt_mdss_handle(struct ptlrpc_request *req)
1748 return mdt_handle_common(req, mdt_seq_handlers);
1751 static int mdt_dtss_handle(struct ptlrpc_request *req)
1753 return mdt_handle_common(req, mdt_seq_handlers);
1756 static int mdt_fld_handle(struct ptlrpc_request *req)
1758 return mdt_handle_common(req, mdt_fld_handlers);
1774 static int mdt_intent_getattr(enum mdt_it_code opcode,
1775 struct mdt_thread_info *info,
1776 struct ldlm_lock **,
1778 static int mdt_intent_reint(enum mdt_it_code opcode,
1779 struct mdt_thread_info *info,
1780 struct ldlm_lock **,
1783 static struct mdt_it_flavor {
1784 const struct req_format *it_fmt;
1786 int (*it_act)(enum mdt_it_code ,
1787 struct mdt_thread_info *,
1788 struct ldlm_lock **,
1791 } mdt_it_flavor[] = {
1793 .it_fmt = &RQF_LDLM_INTENT,
1794 /*.it_flags = HABEO_REFERO,*/
1796 .it_act = mdt_intent_reint,
1797 .it_reint = REINT_OPEN
1800 .it_fmt = &RQF_LDLM_INTENT,
1801 .it_flags = MUTABOR,
1802 .it_act = mdt_intent_reint,
1803 .it_reint = REINT_OPEN
1806 .it_fmt = &RQF_LDLM_INTENT,
1807 .it_flags = MUTABOR,
1808 .it_act = mdt_intent_reint,
1809 .it_reint = REINT_CREATE
1811 [MDT_IT_GETATTR] = {
1812 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1813 .it_flags = HABEO_REFERO,
1814 .it_act = mdt_intent_getattr
1816 [MDT_IT_READDIR] = {
1822 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1823 .it_flags = HABEO_REFERO,
1824 .it_act = mdt_intent_getattr
1827 .it_fmt = &RQF_LDLM_INTENT_UNLINK,
1828 .it_flags = MUTABOR,
1829 .it_act = NULL, /* XXX can be mdt_intent_reint, ? */
1830 .it_reint = REINT_UNLINK
1834 .it_flags = MUTABOR,
1837 [MDT_IT_GETXATTR] = {
1844 int mdt_intent_lock_replace(struct mdt_thread_info *info,
1845 struct ldlm_lock **lockp,
1846 struct ldlm_lock *new_lock,
1847 struct mdt_lock_handle *lh,
1850 struct ptlrpc_request *req = mdt_info_req(info);
1851 struct ldlm_lock *lock = *lockp;
1854 * Get new lock only for cases when possible resent did not find any
1857 if (new_lock == NULL)
1858 new_lock = ldlm_handle2lock(&lh->mlh_lh);
1860 if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1863 LASSERTF(new_lock != NULL,
1864 "lockh "LPX64"\n", lh->mlh_lh.cookie);
1867 * If we've already given this lock to a client once, then we should
1868 * have no readers or writers. Otherwise, we should have one reader
1869 * _or_ writer ref (which will be zeroed below) before returning the
1872 if (new_lock->l_export == req->rq_export) {
1873 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
1875 LASSERT(new_lock->l_export == NULL);
1876 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1881 if (new_lock->l_export == req->rq_export) {
1883 * Already gave this to the client, which means that we
1884 * reconstructed a reply.
1886 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1888 RETURN(ELDLM_LOCK_REPLACED);
1891 /* Fixup the lock to be given to the client */
1892 lock_res_and_lock(new_lock);
1893 new_lock->l_readers = 0;
1894 new_lock->l_writers = 0;
1896 new_lock->l_export = class_export_get(req->rq_export);
1897 list_add(&new_lock->l_export_chain,
1898 &new_lock->l_export->exp_ldlm_data.led_held_locks);
1900 new_lock->l_blocking_ast = lock->l_blocking_ast;
1901 new_lock->l_completion_ast = lock->l_completion_ast;
1902 new_lock->l_remote_handle = lock->l_remote_handle;
1903 new_lock->l_flags &= ~LDLM_FL_LOCAL;
1905 unlock_res_and_lock(new_lock);
1906 LDLM_LOCK_PUT(new_lock);
1907 lh->mlh_lh.cookie = 0;
1909 RETURN(ELDLM_LOCK_REPLACED);
1912 static void mdt_fixup_resent(struct req_capsule *pill,
1913 struct ldlm_lock *new_lock,
1914 struct ldlm_lock **old_lock,
1915 struct mdt_lock_handle *lh)
1917 struct ptlrpc_request *req = pill->rc_req;
1918 struct obd_export *exp = req->rq_export;
1919 struct lustre_handle remote_hdl;
1920 struct ldlm_request *dlmreq;
1921 struct list_head *iter;
1923 if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1926 dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
1927 remote_hdl = dlmreq->lock_handle1;
1929 spin_lock(&exp->exp_ldlm_data.led_lock);
1930 list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1931 struct ldlm_lock *lock;
1932 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1933 if (lock == new_lock)
1935 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1936 lh->mlh_lh.cookie = lock->l_handle.h_cookie;
1937 lh->mlh_mode = lock->l_granted_mode;
1939 LDLM_DEBUG(lock, "restoring lock cookie");
1940 DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1943 *old_lock = LDLM_LOCK_GET(lock);
1944 spin_unlock(&exp->exp_ldlm_data.led_lock);
1948 spin_unlock(&exp->exp_ldlm_data.led_lock);
1951 * If the xid matches, then we know this is a resent request, and allow
1952 * it. (It's probably an OPEN, for which we don't send a lock.
1954 if (req->rq_xid == req_exp_last_xid(req))
1957 if (req->rq_xid == req_exp_last_close_xid(req))
1961 * This remote handle isn't enqueued, so we never received or processed
1962 * this request. Clear MSG_RESENT, because it can be handled like any
1963 * normal request now.
1965 lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1967 DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1971 static int mdt_intent_getattr(enum mdt_it_code opcode,
1972 struct mdt_thread_info *info,
1973 struct ldlm_lock **lockp,
1976 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
1977 struct ldlm_lock *new_lock = NULL;
1979 struct ldlm_reply *ldlm_rep;
1980 struct ptlrpc_request *req;
1986 child_bits = MDS_INODELOCK_LOOKUP;
1988 case MDT_IT_GETATTR:
1989 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1992 CERROR("Unhandled till now");
1996 req = info->mti_pill.rc_req;
1997 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1998 mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
2000 /* Get lock from request for possible resent case. */
2001 mdt_fixup_resent(&info->mti_pill, *lockp, &new_lock, lhc);
2003 ldlm_rep->lock_policy_res2 =
2004 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
2005 mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
2007 if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
2008 ldlm_rep->lock_policy_res2 = 0;
2009 if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
2010 ldlm_rep->lock_policy_res2) {
2011 RETURN(ELDLM_LOCK_ABORTED);
2014 return mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
2017 static int mdt_intent_reint(enum mdt_it_code opcode,
2018 struct mdt_thread_info *info,
2019 struct ldlm_lock **lockp,
2022 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2023 struct ldlm_reply *rep;
2027 static const struct req_format *intent_fmts[REINT_MAX] = {
2028 [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
2029 [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN
2034 opc = mdt_reint_opcode(info, intent_fmts);
2038 if (mdt_it_flavor[opcode].it_reint != opc) {
2039 CERROR("Reint code %ld doesn't match intent: %d\n",
2044 /* Get lock from request for possible resent case. */
2045 mdt_fixup_resent(&info->mti_pill, *lockp, NULL, lhc);
2047 rc = mdt_reint_internal(info, lhc, opc);
2049 rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2053 /* MDC expects this in any case */
2055 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
2057 rep->lock_policy_res2 = rc;
2059 /* cross-ref case, the lock should be returned to the client */
2060 if (rc == -EREMOTE) {
2061 LASSERT(lustre_handle_is_used(&lhc->mlh_lh));
2062 rep->lock_policy_res2 = 0;
2063 return mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
2065 rep->lock_policy_res2 = rc;
2067 RETURN(ELDLM_LOCK_ABORTED);
2070 static int mdt_intent_code(long itcode)
2078 case IT_OPEN|IT_CREAT:
2085 rc = MDT_IT_READDIR;
2088 rc = MDT_IT_GETATTR;
2100 rc = MDT_IT_GETXATTR;
2103 CERROR("Unknown intent opcode: %ld\n", itcode);
2110 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
2111 struct ldlm_lock **lockp, int flags)
2113 struct req_capsule *pill;
2114 struct mdt_it_flavor *flv;
2119 opc = mdt_intent_code(itopc);
2123 pill = &info->mti_pill;
2124 flv = &mdt_it_flavor[opc];
2126 if (flv->it_fmt != NULL)
2127 req_capsule_extend(pill, flv->it_fmt);
2129 rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
2131 struct ptlrpc_request *req = mdt_info_req(info);
2132 if (flv->it_flags & MUTABOR &&
2133 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
2136 if (rc == 0 && flv->it_act != NULL) {
2137 /* execute policy */
2138 rc = flv->it_act(opc, info, lockp, flags);
2144 static int mdt_intent_policy(struct ldlm_namespace *ns,
2145 struct ldlm_lock **lockp, void *req_cookie,
2146 ldlm_mode_t mode, int flags, void *data)
2148 struct mdt_thread_info *info;
2149 struct ptlrpc_request *req = req_cookie;
2150 struct ldlm_intent *it;
2151 struct req_capsule *pill;
2152 struct ldlm_lock *lock = *lockp;
2157 LASSERT(req != NULL);
2159 info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
2160 LASSERT(info != NULL);
2161 pill = &info->mti_pill;
2162 LASSERT(pill->rc_req == req);
2164 if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
2165 req_capsule_extend(pill, &RQF_LDLM_INTENT);
2166 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
2168 LDLM_DEBUG(lock, "intent policy opc: %s\n",
2169 ldlm_it2str(it->opc));
2171 rc = mdt_intent_opc(it->opc, info, lockp, flags);
2177 /* No intent was provided */
2178 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
2179 rc = req_capsule_pack(pill);
2187 static int mdt_seq_fini(const struct lu_context *ctx,
2188 struct mdt_device *m)
2190 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2193 if (ls && ls->ls_server_seq) {
2194 seq_server_fini(ls->ls_server_seq, ctx);
2195 OBD_FREE_PTR(ls->ls_server_seq);
2196 ls->ls_server_seq = NULL;
2199 if (ls && ls->ls_control_seq) {
2200 seq_server_fini(ls->ls_control_seq, ctx);
2201 OBD_FREE_PTR(ls->ls_control_seq);
2202 ls->ls_control_seq = NULL;
2205 if (ls && ls->ls_client_seq) {
2206 seq_client_fini(ls->ls_client_seq);
2207 OBD_FREE_PTR(ls->ls_client_seq);
2208 ls->ls_client_seq = NULL;
2214 static int mdt_seq_init(const struct lu_context *ctx,
2216 struct mdt_device *m)
2223 ls = m->mdt_md_dev.md_lu_dev.ld_site;
2226 * This is sequence-controller node. Init seq-controller server on local
2229 if (ls->ls_node_id == 0) {
2230 LASSERT(ls->ls_control_seq == NULL);
2232 OBD_ALLOC_PTR(ls->ls_control_seq);
2233 if (ls->ls_control_seq == NULL)
2236 rc = seq_server_init(ls->ls_control_seq,
2237 m->mdt_bottom, uuid,
2238 LUSTRE_SEQ_CONTROLLER,
2242 GOTO(out_seq_fini, rc);
2244 OBD_ALLOC_PTR(ls->ls_client_seq);
2245 if (ls->ls_client_seq == NULL)
2246 GOTO(out_seq_fini, rc = -ENOMEM);
2248 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2249 if (prefix == NULL) {
2250 OBD_FREE_PTR(ls->ls_client_seq);
2251 GOTO(out_seq_fini, rc = -ENOMEM);
2254 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2258 * Init seq-controller client after seq-controller server is
2259 * ready. Pass ls->ls_control_seq to it for direct talking.
2261 rc = seq_client_init(ls->ls_client_seq, NULL,
2262 LUSTRE_SEQ_METADATA, prefix,
2263 ls->ls_control_seq);
2264 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2267 GOTO(out_seq_fini, rc);
2270 /* Init seq-server on local MDT */
2271 LASSERT(ls->ls_server_seq == NULL);
2273 OBD_ALLOC_PTR(ls->ls_server_seq);
2274 if (ls->ls_server_seq == NULL)
2275 GOTO(out_seq_fini, rc = -ENOMEM);
2277 rc = seq_server_init(ls->ls_server_seq,
2278 m->mdt_bottom, uuid,
2282 GOTO(out_seq_fini, rc = -ENOMEM);
2284 /* Assign seq-controller client to local seq-server. */
2285 if (ls->ls_node_id == 0) {
2286 LASSERT(ls->ls_client_seq != NULL);
2288 rc = seq_server_set_cli(ls->ls_server_seq,
2296 mdt_seq_fini(ctx, m);
2302 * Init client sequence manager which is used by local MDS to talk to sequence
2303 * controller on remote node.
2305 static int mdt_seq_init_cli(const struct lu_context *ctx,
2306 struct mdt_device *m,
2307 struct lustre_cfg *cfg)
2309 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2310 struct obd_device *mdc;
2311 struct obd_uuid *uuidp, *mdcuuidp;
2312 char *uuid_str, *mdc_uuid_str;
2315 struct mdt_thread_info *info;
2316 char *p, *index_string = lustre_cfg_string(cfg, 2);
2319 info = lu_context_key_get(ctx, &mdt_thread_key);
2320 uuidp = &info->mti_u.uuid[0];
2321 mdcuuidp = &info->mti_u.uuid[1];
2323 LASSERT(index_string);
2325 index = simple_strtol(index_string, &p, 10);
2327 CERROR("Invalid index in lustre_cgf, offset 2\n");
2331 /* check if this is adding the first MDC and controller is not yet
2333 if (index != 0 || ls->ls_client_seq)
2336 uuid_str = lustre_cfg_string(cfg, 1);
2337 mdc_uuid_str = lustre_cfg_string(cfg, 4);
2338 obd_str2uuid(uuidp, uuid_str);
2339 obd_str2uuid(mdcuuidp, mdc_uuid_str);
2341 mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2343 CERROR("can't find controller MDC by uuid %s\n",
2346 } else if (!mdc->obd_set_up) {
2347 CERROR("target %s not set up\n", mdc->obd_name);
2350 struct lustre_handle conn = {0, };
2352 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
2353 mdc->obd_name, mdc->obd_uuid.uuid);
2355 rc = obd_connect(ctx, &conn, mdc, &mdc->obd_uuid, NULL);
2358 CERROR("target %s connect error %d\n",
2361 ls->ls_control_exp = class_conn2export(&conn);
2363 OBD_ALLOC_PTR(ls->ls_client_seq);
2365 if (ls->ls_client_seq != NULL) {
2368 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2372 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2375 rc = seq_client_init(ls->ls_client_seq,
2377 LUSTRE_SEQ_METADATA,
2379 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2386 LASSERT(ls->ls_server_seq != NULL);
2388 rc = seq_server_set_cli(ls->ls_server_seq,
2397 static void mdt_seq_fini_cli(struct mdt_device *m)
2404 ls = m->mdt_md_dev.md_lu_dev.ld_site;
2406 if (ls && ls->ls_server_seq)
2407 seq_server_set_cli(ls->ls_server_seq,
2410 if (ls && ls->ls_control_exp) {
2411 rc = obd_disconnect(ls->ls_control_exp);
2413 CERROR("failure to disconnect "
2416 ls->ls_control_exp = NULL;
2424 static int mdt_fld_fini(const struct lu_context *ctx,
2425 struct mdt_device *m)
2427 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2430 if (ls && ls->ls_server_fld) {
2431 fld_server_fini(ls->ls_server_fld, ctx);
2432 OBD_FREE_PTR(ls->ls_server_fld);
2433 ls->ls_server_fld = NULL;
2436 if (ls && ls->ls_client_fld != NULL) {
2437 fld_client_fini(ls->ls_client_fld);
2438 OBD_FREE_PTR(ls->ls_client_fld);
2439 ls->ls_client_fld = NULL;
2445 static int mdt_fld_init(const struct lu_context *ctx,
2447 struct mdt_device *m)
2449 struct lu_fld_target target;
2454 ls = m->mdt_md_dev.md_lu_dev.ld_site;
2456 OBD_ALLOC_PTR(ls->ls_server_fld);
2457 if (ls->ls_server_fld == NULL)
2458 RETURN(rc = -ENOMEM);
2460 rc = fld_server_init(ls->ls_server_fld,
2461 m->mdt_bottom, uuid, ctx);
2463 OBD_FREE_PTR(ls->ls_server_fld);
2464 ls->ls_server_fld = NULL;
2467 OBD_ALLOC_PTR(ls->ls_client_fld);
2468 if (!ls->ls_client_fld)
2469 GOTO(out_fld_fini, rc = -ENOMEM);
2471 rc = fld_client_init(ls->ls_client_fld, uuid,
2472 LUSTRE_CLI_FLD_HASH_DHT);
2474 CERROR("can't init FLD, err %d\n", rc);
2475 OBD_FREE_PTR(ls->ls_client_fld);
2476 GOTO(out_fld_fini, rc);
2479 target.ft_srv = ls->ls_server_fld;
2480 target.ft_idx = ls->ls_node_id;
2481 target.ft_exp = NULL;
2483 fld_client_add_target(ls->ls_client_fld, &target);
2487 mdt_fld_fini(ctx, m);
2491 /* device init/fini methods */
2492 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2494 if (m->mdt_regular_service != NULL) {
2495 ptlrpc_unregister_service(m->mdt_regular_service);
2496 m->mdt_regular_service = NULL;
2498 if (m->mdt_readpage_service != NULL) {
2499 ptlrpc_unregister_service(m->mdt_readpage_service);
2500 m->mdt_readpage_service = NULL;
2502 if (m->mdt_setattr_service != NULL) {
2503 ptlrpc_unregister_service(m->mdt_setattr_service);
2504 m->mdt_setattr_service = NULL;
2506 if (m->mdt_mdsc_service != NULL) {
2507 ptlrpc_unregister_service(m->mdt_mdsc_service);
2508 m->mdt_mdsc_service = NULL;
2510 if (m->mdt_mdss_service != NULL) {
2511 ptlrpc_unregister_service(m->mdt_mdss_service);
2512 m->mdt_mdss_service = NULL;
2514 if (m->mdt_dtss_service != NULL) {
2515 ptlrpc_unregister_service(m->mdt_dtss_service);
2516 m->mdt_dtss_service = NULL;
2518 if (m->mdt_fld_service != NULL) {
2519 ptlrpc_unregister_service(m->mdt_fld_service);
2520 m->mdt_fld_service = NULL;
2524 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2527 static struct ptlrpc_service_conf conf;
2530 conf = (typeof(conf)) {
2531 .psc_nbufs = MDS_NBUFS,
2532 .psc_bufsize = MDS_BUFSIZE,
2533 .psc_max_req_size = MDS_MAXREQSIZE,
2534 .psc_max_reply_size = MDS_MAXREPSIZE,
2535 .psc_req_portal = MDS_REQUEST_PORTAL,
2536 .psc_rep_portal = MDC_REPLY_PORTAL,
2537 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2539 * We'd like to have a mechanism to set this on a per-device
2540 * basis, but alas...
2542 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2544 .psc_ctx_tags = LCT_MD_THREAD
2547 m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2548 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2549 "mdt_ldlm_client", m->mdt_ldlm_client);
2551 m->mdt_regular_service =
2552 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
2553 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2555 if (m->mdt_regular_service == NULL)
2558 rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
2560 GOTO(err_mdt_svc, rc);
2563 * readpage service configuration. Parameters have to be adjusted,
2566 conf = (typeof(conf)) {
2567 .psc_nbufs = MDS_NBUFS,
2568 .psc_bufsize = MDS_BUFSIZE,
2569 .psc_max_req_size = MDS_MAXREQSIZE,
2570 .psc_max_reply_size = MDS_MAXREPSIZE,
2571 .psc_req_portal = MDS_READPAGE_PORTAL,
2572 .psc_rep_portal = MDC_REPLY_PORTAL,
2573 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2574 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2576 .psc_ctx_tags = LCT_MD_THREAD
2578 m->mdt_readpage_service =
2579 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2580 LUSTRE_MDT_NAME "_readpage",
2581 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2584 if (m->mdt_readpage_service == NULL) {
2585 CERROR("failed to start readpage service\n");
2586 GOTO(err_mdt_svc, rc = -ENOMEM);
2589 rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2592 * setattr service configuration.
2594 conf = (typeof(conf)) {
2595 .psc_nbufs = MDS_NBUFS,
2596 .psc_bufsize = MDS_BUFSIZE,
2597 .psc_max_req_size = MDS_MAXREQSIZE,
2598 .psc_max_reply_size = MDS_MAXREPSIZE,
2599 .psc_req_portal = MDS_SETATTR_PORTAL,
2600 .psc_rep_portal = MDC_REPLY_PORTAL,
2601 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2602 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2604 .psc_ctx_tags = LCT_MD_THREAD
2607 m->mdt_setattr_service =
2608 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2609 LUSTRE_MDT_NAME "_setattr",
2610 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2613 if (!m->mdt_setattr_service) {
2614 CERROR("failed to start setattr service\n");
2615 GOTO(err_mdt_svc, rc = -ENOMEM);
2618 rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2620 GOTO(err_mdt_svc, rc);
2623 * sequence controller service configuration
2625 conf = (typeof(conf)) {
2626 .psc_nbufs = MDS_NBUFS,
2627 .psc_bufsize = MDS_BUFSIZE,
2628 .psc_max_req_size = SEQ_MAXREQSIZE,
2629 .psc_max_reply_size = SEQ_MAXREPSIZE,
2630 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
2631 .psc_rep_portal = MDC_REPLY_PORTAL,
2632 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2633 .psc_num_threads = SEQ_NUM_THREADS,
2634 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2637 m->mdt_mdsc_service =
2638 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
2639 LUSTRE_MDT_NAME"_mdsc",
2640 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2642 if (!m->mdt_mdsc_service) {
2643 CERROR("failed to start seq controller service\n");
2644 GOTO(err_mdt_svc, rc = -ENOMEM);
2647 rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
2649 GOTO(err_mdt_svc, rc);
2652 * metadata sequence server service configuration
2654 conf = (typeof(conf)) {
2655 .psc_nbufs = MDS_NBUFS,
2656 .psc_bufsize = MDS_BUFSIZE,
2657 .psc_max_req_size = SEQ_MAXREQSIZE,
2658 .psc_max_reply_size = SEQ_MAXREPSIZE,
2659 .psc_req_portal = SEQ_METADATA_PORTAL,
2660 .psc_rep_portal = MDC_REPLY_PORTAL,
2661 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2662 .psc_num_threads = SEQ_NUM_THREADS,
2663 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2666 m->mdt_mdss_service =
2667 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
2668 LUSTRE_MDT_NAME"_mdss",
2669 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2671 if (!m->mdt_mdss_service) {
2672 CERROR("failed to start metadata seq server service\n");
2673 GOTO(err_mdt_svc, rc = -ENOMEM);
2676 rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
2678 GOTO(err_mdt_svc, rc);
2682 * Data sequence server service configuration. We want to have really
2683 * cluster-wide sequences space. This is why we start only one sequence
2684 * controller which manages space.
2686 conf = (typeof(conf)) {
2687 .psc_nbufs = MDS_NBUFS,
2688 .psc_bufsize = MDS_BUFSIZE,
2689 .psc_max_req_size = SEQ_MAXREQSIZE,
2690 .psc_max_reply_size = SEQ_MAXREPSIZE,
2691 .psc_req_portal = SEQ_DATA_PORTAL,
2692 .psc_rep_portal = OSC_REPLY_PORTAL,
2693 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2694 .psc_num_threads = SEQ_NUM_THREADS,
2695 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2698 m->mdt_dtss_service =
2699 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
2700 LUSTRE_MDT_NAME"_dtss",
2701 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2703 if (!m->mdt_dtss_service) {
2704 CERROR("failed to start data seq server service\n");
2705 GOTO(err_mdt_svc, rc = -ENOMEM);
2708 rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
2710 GOTO(err_mdt_svc, rc);
2712 /* FLD service start */
2713 conf = (typeof(conf)) {
2714 .psc_nbufs = MDS_NBUFS,
2715 .psc_bufsize = MDS_BUFSIZE,
2716 .psc_max_req_size = FLD_MAXREQSIZE,
2717 .psc_max_reply_size = FLD_MAXREPSIZE,
2718 .psc_req_portal = FLD_REQUEST_PORTAL,
2719 .psc_rep_portal = MDC_REPLY_PORTAL,
2720 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2721 .psc_num_threads = FLD_NUM_THREADS,
2722 .psc_ctx_tags = LCT_DT_THREAD|LCT_MD_THREAD
2725 m->mdt_fld_service =
2726 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
2727 LUSTRE_MDT_NAME"_fld",
2728 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2730 if (!m->mdt_fld_service) {
2731 CERROR("failed to start fld service\n");
2732 GOTO(err_mdt_svc, rc = -ENOMEM);
2735 rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
2737 GOTO(err_mdt_svc, rc);
2742 mdt_stop_ptlrpc_service(m);
2747 static void mdt_stack_fini(const struct lu_context *ctx,
2748 struct mdt_device *m, struct lu_device *top)
2750 struct lu_device *d = top, *n;
2751 struct lustre_cfg_bufs *bufs;
2752 struct lustre_cfg *lcfg;
2753 struct mdt_thread_info *info;
2756 info = lu_context_key_get(ctx, &mdt_thread_key);
2757 LASSERT(info != NULL);
2759 bufs = &info->mti_u.bufs;
2760 /* process cleanup */
2761 lustre_cfg_bufs_reset(bufs, NULL);
2762 lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
2764 CERROR("Cannot alloc lcfg!\n");
2768 top->ld_ops->ldo_process_config(ctx, top, lcfg);
2769 lustre_cfg_free(lcfg);
2771 lu_site_purge(ctx, top->ld_site, ~0);
2773 struct obd_type *type;
2774 struct lu_device_type *ldt = d->ld_type;
2776 /* each fini() returns next device in stack of layers
2777 * * so we can avoid the recursion */
2778 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
2780 ldt->ldt_ops->ldto_device_free(ctx, d);
2781 type = ldt->ldt_obd_type;
2783 class_put_type(type);
2785 /* switch to the next device in the layer */
2788 m->mdt_child = NULL;
2791 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
2792 const char *typename,
2793 struct lu_device *child,
2794 struct lustre_cfg *cfg)
2796 struct obd_type *type;
2797 struct lu_device_type *ldt;
2798 struct lu_device *d;
2803 type = class_get_type(typename);
2805 CERROR("Unknown type: '%s'\n", typename);
2806 GOTO(out, rc = -ENODEV);
2809 rc = lu_context_refill(ctx);
2811 CERROR("Failure to refill context: '%d'\n", rc);
2817 CERROR("type: '%s'\n", typename);
2818 GOTO(out_type, rc = -EINVAL);
2821 ldt->ldt_obd_type = type;
2822 d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
2824 CERROR("Cannot allocate device: '%s'\n", typename);
2825 GOTO(out_type, rc = -ENODEV);
2828 LASSERT(child->ld_site);
2829 d->ld_site = child->ld_site;
2832 rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
2834 CERROR("can't init device '%s', rc %d\n", typename, rc);
2835 GOTO(out_alloc, rc);
2842 ldt->ldt_ops->ldto_device_free(ctx, d);
2845 class_put_type(type);
2850 static int mdt_stack_init(const struct lu_context *ctx,
2851 struct mdt_device *m, struct lustre_cfg *cfg)
2853 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2854 struct lu_device *tmp;
2855 struct md_device *md;
2859 /* init the stack */
2860 tmp = mdt_layer_setup(ctx, LUSTRE_OSD_NAME, d, cfg);
2862 RETURN(PTR_ERR(tmp));
2864 m->mdt_bottom = lu2dt_dev(tmp);
2866 tmp = mdt_layer_setup(ctx, LUSTRE_MDD_NAME, d, cfg);
2868 GOTO(out, rc = PTR_ERR(tmp));
2873 tmp = mdt_layer_setup(ctx, LUSTRE_CMM_NAME, d, cfg);
2875 GOTO(out, rc = PTR_ERR(tmp));
2878 /*set mdd upcall device*/
2879 md->md_upcall.mu_upcall_dev = lu2md_dev(d);
2882 /*set cmm upcall device*/
2883 md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
2885 m->mdt_child = lu2md_dev(d);
2887 /* process setup config */
2888 tmp = &m->mdt_md_dev.md_lu_dev;
2889 rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2892 /* fini from last known good lu_device */
2894 mdt_stack_fini(ctx, m, d);
2899 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2901 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2902 struct lu_site *ls = d->ld_site;
2905 target_cleanup_recovery(m->mdt_md_dev.md_lu_dev.ld_obd);
2906 ping_evictor_stop();
2907 mdt_stop_ptlrpc_service(m);
2909 if (m->mdt_namespace != NULL) {
2910 ldlm_namespace_free(m->mdt_namespace, 0);
2911 m->mdt_namespace = NULL;
2914 mdt_seq_fini(ctx, m);
2915 mdt_seq_fini_cli(m);
2916 mdt_fld_fini(ctx, m);
2918 mdt_fs_cleanup(ctx, m);
2920 /* finish the stack */
2921 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2928 LASSERT(atomic_read(&d->ld_ref) == 0);
2929 md_device_fini(&m->mdt_md_dev);
2934 int mdt_postrecov(const struct lu_context *, struct mdt_device *);
2936 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2937 struct lu_device_type *ldt, struct lustre_cfg *cfg)
2939 struct lprocfs_static_vars lvars;
2940 struct mdt_thread_info *info;
2941 struct obd_device *obd;
2942 const char *dev = lustre_cfg_string(cfg, 0);
2943 const char *num = lustre_cfg_string(cfg, 2);
2948 info = lu_context_key_get(ctx, &mdt_thread_key);
2949 LASSERT(info != NULL);
2951 obd = class_name2obd(dev);
2954 spin_lock_init(&m->mdt_transno_lock);
2956 m->mdt_max_mdsize = MAX_MD_SIZE;
2957 m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2959 spin_lock_init(&m->mdt_ioepoch_lock);
2960 /* Temporary. should parse mount option. */
2961 m->mdt_opts.mo_user_xattr = 0;
2962 m->mdt_opts.mo_acl = 0;
2963 m->mdt_opts.mo_compat_resname = 0;
2964 obd->obd_replayable = 1;
2965 spin_lock_init(&m->mdt_client_bitmap_lock);
2971 md_device_init(&m->mdt_md_dev, ldt);
2972 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2973 m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2974 /* set this lu_device to obd, because error handling need it */
2975 obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
2977 rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2979 CERROR("can't init lu_site, rc %d\n", rc);
2980 GOTO(err_free_site, rc);
2983 lprocfs_init_vars(mdt, &lvars);
2984 rc = lprocfs_obd_setup(obd, lvars.obd_vars);
2986 CERROR("can't init lprocfs, rc %d\n", rc);
2987 GOTO(err_fini_site, rc);
2990 /* init the stack */
2991 rc = mdt_stack_init(ctx, m, cfg);
2993 CERROR("can't init device stack, rc %d\n", rc);
2994 GOTO(err_fini_site, rc);
2997 /* set server index */
2999 s->ls_node_id = simple_strtol(num, NULL, 10);
3001 rc = mdt_fld_init(ctx, obd->obd_name, m);
3003 GOTO(err_fini_stack, rc);
3005 rc = mdt_seq_init(ctx, obd->obd_name, m);
3007 GOTO(err_fini_fld, rc);
3009 snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
3010 LUSTRE_MDT_NAME"-%p", m);
3011 m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
3012 LDLM_NAMESPACE_SERVER);
3013 if (m->mdt_namespace == NULL)
3014 GOTO(err_fini_seq, rc = -ENOMEM);
3016 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
3018 rc = mdt_start_ptlrpc_service(m);
3020 GOTO(err_free_ns, rc);
3022 ping_evictor_start();
3023 rc = mdt_fs_setup(ctx, m);
3025 GOTO(err_stop_service, rc);
3026 if(obd->obd_recovering == 0)
3027 mdt_postrecov(ctx, m);
3031 mdt_stop_ptlrpc_service(m);
3033 ldlm_namespace_free(m->mdt_namespace, 0);
3034 m->mdt_namespace = NULL;
3036 mdt_seq_fini(ctx, m);
3038 mdt_fld_fini(ctx, m);
3040 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
3046 md_device_fini(&m->mdt_md_dev);
3050 /* used by MGS to process specific configurations */
3051 static int mdt_process_config(const struct lu_context *ctx,
3052 struct lu_device *d, struct lustre_cfg *cfg)
3054 struct mdt_device *m = mdt_dev(d);
3055 struct md_device *md_next = m->mdt_child;
3056 struct lu_device *next = md2lu_dev(md_next);
3060 switch (cfg->lcfg_command) {
3063 * Add mdc hook to get first MDT uuid and connect it to
3064 * ls->controller to use for seq manager.
3066 err = mdt_seq_init_cli(ctx, mdt_dev(d), cfg);
3068 CERROR("can't initialize controller export, "
3072 /* others are passed further */
3073 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
3079 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
3080 const struct lu_object_header *hdr,
3081 struct lu_device *d)
3083 struct mdt_object *mo;
3089 struct lu_object *o;
3090 struct lu_object_header *h;
3092 o = &mo->mot_obj.mo_lu;
3093 h = &mo->mot_header;
3094 lu_object_header_init(h);
3095 lu_object_init(o, h, d);
3096 lu_object_add_top(h, o);
3097 o->lo_ops = &mdt_obj_ops;
3103 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
3105 struct mdt_device *d = mdt_dev(o->lo_dev);
3106 struct lu_device *under;
3107 struct lu_object *below;
3111 CDEBUG(D_INFO, "object init, fid = "DFID"\n",
3112 PFID(lu_object_fid(o)));
3114 under = &d->mdt_child->md_lu_dev;
3115 below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
3116 if (below != NULL) {
3117 lu_object_add(o, below);
3123 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
3125 struct mdt_object *mo = mdt_obj(o);
3126 struct lu_object_header *h;
3130 CDEBUG(D_INFO, "object free, fid = "DFID"\n",
3131 PFID(lu_object_fid(o)));
3134 lu_object_header_fini(h);
3139 static int mdt_object_print(const struct lu_context *ctxt, void *cookie,
3140 lu_printer_t p, const struct lu_object *o)
3142 return (*p)(ctxt, cookie, LUSTRE_MDT_NAME"-object@%p", o);
3145 static struct lu_device_operations mdt_lu_ops = {
3146 .ldo_object_alloc = mdt_object_alloc,
3147 .ldo_process_config = mdt_process_config
3150 static struct lu_object_operations mdt_obj_ops = {
3151 .loo_object_init = mdt_object_init,
3152 .loo_object_free = mdt_object_free,
3153 .loo_object_print = mdt_object_print
3156 /* mds_connect_internal */
3157 static int mdt_connect_internal(struct obd_export *exp,
3158 struct mdt_device *mdt,
3159 struct obd_connect_data *data)
3162 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
3163 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
3165 /* If no known bits (which should not happen, probably,
3166 as everybody should support LOOKUP and UPDATE bits at least)
3167 revert to compat mode with plain locks. */
3168 if (!data->ocd_ibits_known &&
3169 data->ocd_connect_flags & OBD_CONNECT_IBITS)
3170 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
3172 if (!mdt->mdt_opts.mo_acl)
3173 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
3175 if (!mdt->mdt_opts.mo_user_xattr)
3176 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
3178 exp->exp_connect_flags = data->ocd_connect_flags;
3179 data->ocd_version = LUSTRE_VERSION_CODE;
3180 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
3183 if (mdt->mdt_opts.mo_acl &&
3184 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
3185 CWARN("%s: MDS requires ACL support but client does not\n",
3186 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3192 /* mds_connect copy */
3193 static int mdt_obd_connect(const struct lu_context *ctx,
3194 struct lustre_handle *conn, struct obd_device *obd,
3195 struct obd_uuid *cluuid,
3196 struct obd_connect_data *data)
3198 struct mdt_export_data *med;
3199 struct mdt_client_data *mcd;
3200 struct obd_export *exp;
3201 struct mdt_device *mdt;
3205 LASSERT(ctx != NULL);
3206 if (!conn || !obd || !cluuid)
3209 mdt = mdt_dev(obd->obd_lu_dev);
3211 rc = class_connect(conn, obd, cluuid);
3215 exp = class_conn2export(conn);
3216 LASSERT(exp != NULL);
3217 med = &exp->exp_mdt_data;
3219 rc = mdt_connect_internal(exp, mdt, data);
3223 memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
3225 rc = mdt_client_new(ctx, mdt, med);
3233 class_disconnect(exp);
3235 class_export_put(exp);
3240 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
3241 struct obd_uuid *cluuid,
3242 struct obd_connect_data *data)
3247 if (exp == NULL || obd == NULL || cluuid == NULL)
3250 rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
3255 static int mdt_obd_disconnect(struct obd_export *exp)
3257 struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
3262 class_export_get(exp);
3264 /* Disconnect early so that clients can't keep using export */
3265 rc = class_disconnect(exp);
3266 if (mdt->mdt_namespace != NULL || exp->exp_obd->obd_namespace != NULL)
3267 ldlm_cancel_locks_for_export(exp);
3269 /* complete all outstanding replies */
3270 spin_lock(&exp->exp_lock);
3271 while (!list_empty(&exp->exp_outstanding_replies)) {
3272 struct ptlrpc_reply_state *rs =
3273 list_entry(exp->exp_outstanding_replies.next,
3274 struct ptlrpc_reply_state, rs_exp_list);
3275 struct ptlrpc_service *svc = rs->rs_service;
3277 spin_lock(&svc->srv_lock);
3278 list_del_init(&rs->rs_exp_list);
3279 ptlrpc_schedule_difficult_reply(rs);
3280 spin_unlock(&svc->srv_lock);
3282 spin_unlock(&exp->exp_lock);
3284 class_export_put(exp);
3288 /* FIXME: Can we avoid using these two interfaces? */
3289 static int mdt_init_export(struct obd_export *exp)
3291 struct mdt_export_data *med = &exp->exp_mdt_data;
3294 INIT_LIST_HEAD(&med->med_open_head);
3295 spin_lock_init(&med->med_open_lock);
3296 exp->exp_connecting = 1;
3300 static int mdt_destroy_export(struct obd_export *export)
3302 struct mdt_export_data *med;
3303 struct obd_device *obd = export->exp_obd;
3304 struct mdt_device *mdt;
3305 struct mdt_thread_info *info;
3306 struct lu_context ctxt;
3311 med = &export->exp_mdt_data;
3313 target_destroy_export(export);
3315 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
3318 mdt = mdt_dev(obd->obd_lu_dev);
3319 LASSERT(mdt != NULL);
3321 rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3325 lu_context_enter(&ctxt);
3327 info = lu_context_key_get(&ctxt, &mdt_thread_key);
3328 LASSERT(info != NULL);
3329 memset(info, 0, sizeof *info);
3330 info->mti_ctxt = &ctxt;
3331 info->mti_mdt = mdt;
3333 ma = &info->mti_attr;
3334 ma->ma_lmm_size = mdt->mdt_max_mdsize;
3335 ma->ma_cookie_size = mdt->mdt_max_cookiesize;
3336 OBD_ALLOC(ma->ma_lmm, mdt->mdt_max_mdsize);
3337 OBD_ALLOC(ma->ma_cookie, mdt->mdt_max_cookiesize);
3339 if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
3340 GOTO(out, rc = -ENOMEM);
3341 ma->ma_need = MA_LOV | MA_COOKIE;
3343 /* Close any open files (which may also cause orphan unlinking). */
3344 spin_lock(&med->med_open_lock);
3345 while (!list_empty(&med->med_open_head)) {
3346 struct list_head *tmp = med->med_open_head.next;
3347 struct mdt_file_data *mfd =
3348 list_entry(tmp, struct mdt_file_data, mfd_list);
3349 struct md_attr *ma = &info->mti_attr;
3351 /* Remove mfd handle so it can't be found again.
3352 * We are consuming the mfd_list reference here. */
3353 class_handle_unhash(&mfd->mfd_handle);
3354 list_del_init(&mfd->mfd_list);
3355 spin_unlock(&med->med_open_lock);
3356 mdt_mfd_close(info, mfd);
3357 /* TODO: if we close the unlinked file,
3358 * we need to remove it's objects from OST */
3359 memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
3360 spin_lock(&med->med_open_lock);
3362 spin_unlock(&med->med_open_lock);
3363 info->mti_mdt = NULL;
3364 mdt_client_del(&ctxt, mdt, med);
3368 OBD_FREE(ma->ma_lmm, mdt->mdt_max_mdsize);
3370 OBD_FREE(ma->ma_cookie, mdt->mdt_max_cookiesize);
3371 lu_context_exit(&ctxt);
3372 lu_context_fini(&ctxt);
3377 static int mdt_upcall(const struct lu_context *ctx, struct md_device *md,
3378 enum md_upcall_event ev)
3380 struct mdt_device *m = mdt_dev(&md->md_lu_dev);
3381 struct md_device *next = m->mdt_child;
3382 struct mdt_thread_info *mti;
3388 rc = next->md_ops->mdo_maxsize_get(ctx, next,
3390 &m->mdt_max_cookiesize);
3391 CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
3392 m->mdt_max_mdsize, m->mdt_max_cookiesize);
3395 mti = lu_context_key_get(ctx, &mdt_thread_key);
3396 mti->mti_no_need_trans = 1;
3397 CDEBUG(D_INFO, "disable mdt trans for this thread\n");
3400 CERROR("invalid event\n");
3407 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3408 void *karg, void *uarg)
3410 struct lu_context ctxt;
3411 struct obd_device *obd= exp->exp_obd;
3412 struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3413 struct dt_device *dt = mdt->mdt_bottom;
3417 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
3418 rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3421 lu_context_enter(&ctxt);
3425 rc = dt->dd_ops->dt_sync(&ctxt, dt);
3428 case OBD_IOC_SET_READONLY:
3429 rc = dt->dd_ops->dt_sync(&ctxt, dt);
3430 dt->dd_ops->dt_ro(&ctxt, dt);
3433 case OBD_IOC_ABORT_RECOVERY:
3434 CERROR("aborting recovery for device %s\n", obd->obd_name);
3435 target_abort_recovery(obd);
3439 CERROR("not supported cmd = %d for device %s\n",
3440 cmd, obd->obd_name);
3444 lu_context_exit(&ctxt);
3445 lu_context_fini(&ctxt);
3449 int mdt_postrecov(const struct lu_context *ctx, struct mdt_device *mdt)
3451 struct lu_device *ld = md2lu_dev(mdt->mdt_child);
3454 rc = ld->ld_ops->ldo_recovery_complete(ctx, ld);
3458 int mdt_obd_postrecov(struct obd_device *obd)
3460 struct lu_context ctxt;
3463 rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3466 lu_context_enter(&ctxt);
3467 rc = mdt_postrecov(&ctxt, mdt_dev(obd->obd_lu_dev));
3468 lu_context_exit(&ctxt);
3469 lu_context_fini(&ctxt);
3473 static struct obd_ops mdt_obd_device_ops = {
3474 .o_owner = THIS_MODULE,
3475 .o_connect = mdt_obd_connect,
3476 .o_reconnect = mdt_obd_reconnect,
3477 .o_disconnect = mdt_obd_disconnect,
3478 .o_init_export = mdt_init_export,
3479 .o_destroy_export = mdt_destroy_export,
3480 .o_iocontrol = mdt_iocontrol,
3481 .o_postrecov = mdt_obd_postrecov
3485 static struct lu_device* mdt_device_fini(const struct lu_context *ctx,
3486 struct lu_device *d)
3488 struct mdt_device *m = mdt_dev(d);
3494 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
3496 struct mdt_device *m = mdt_dev(d);
3501 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
3502 struct lu_device_type *t,
3503 struct lustre_cfg *cfg)
3505 struct lu_device *l;
3506 struct mdt_device *m;
3512 l = &m->mdt_md_dev.md_lu_dev;
3513 rc = mdt_init0(ctx, m, t, cfg);
3519 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
3521 l = ERR_PTR(-ENOMEM);
3526 * context key constructor/destructor
3528 static void *mdt_thread_init(const struct lu_context *ctx,
3529 struct lu_context_key *key)
3531 struct mdt_thread_info *info;
3534 * check that no high order allocations are incurred.
3536 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
3537 OBD_ALLOC_PTR(info);
3539 info = ERR_PTR(-ENOMEM);
3543 static void mdt_thread_fini(const struct lu_context *ctx,
3544 struct lu_context_key *key, void *data)
3546 struct mdt_thread_info *info = data;
3550 struct lu_context_key mdt_thread_key = {
3551 .lct_tags = LCT_MD_THREAD,
3552 .lct_init = mdt_thread_init,
3553 .lct_fini = mdt_thread_fini
3556 static void *mdt_txn_init(const struct lu_context *ctx,
3557 struct lu_context_key *key)
3559 struct mdt_txn_info *txi;
3562 * check that no high order allocations are incurred.
3564 CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
3567 txi = ERR_PTR(-ENOMEM);
3571 static void mdt_txn_fini(const struct lu_context *ctx,
3572 struct lu_context_key *key, void *data)
3574 struct mdt_txn_info *txi = data;
3578 struct lu_context_key mdt_txn_key = {
3579 .lct_tags = LCT_TX_HANDLE,
3580 .lct_init = mdt_txn_init,
3581 .lct_fini = mdt_txn_fini
3585 static int mdt_type_init(struct lu_device_type *t)
3589 rc = lu_context_key_register(&mdt_thread_key);
3591 rc = lu_context_key_register(&mdt_txn_key);
3595 static void mdt_type_fini(struct lu_device_type *t)
3597 lu_context_key_degister(&mdt_thread_key);
3598 lu_context_key_degister(&mdt_txn_key);
3601 static struct lu_device_type_operations mdt_device_type_ops = {
3602 .ldto_init = mdt_type_init,
3603 .ldto_fini = mdt_type_fini,
3605 .ldto_device_alloc = mdt_device_alloc,
3606 .ldto_device_free = mdt_device_free,
3607 .ldto_device_fini = mdt_device_fini
3610 static struct lu_device_type mdt_device_type = {
3611 .ldt_tags = LU_DEVICE_MD,
3612 .ldt_name = LUSTRE_MDT_NAME,
3613 .ldt_ops = &mdt_device_type_ops,
3614 .ldt_ctx_tags = LCT_MD_THREAD
3617 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
3618 { "uuid", lprocfs_rd_uuid, 0, 0 },
3619 { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 },
3620 { "num_exports", lprocfs_rd_num_exports, 0, 0 },
3624 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
3625 { "num_refs", lprocfs_rd_numrefs, 0, 0 },
3629 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
3631 static int __init mdt_mod_init(void)
3633 struct lprocfs_static_vars lvars;
3636 printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
3638 mdt_num_threads = MDT_NUM_THREADS;
3639 lprocfs_init_vars(mdt, &lvars);
3640 rc = class_register_type(&mdt_obd_device_ops, NULL,
3641 lvars.module_vars, LUSTRE_MDT_NAME,
3646 static void __exit mdt_mod_exit(void)
3648 class_unregister_type(LUSTRE_MDT_NAME);
3652 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt) \
3653 [prefix ## _ ## opc - prefix ## _ ## base] = { \
3655 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
3656 .mh_opc = prefix ## _ ## opc, \
3657 .mh_flags = flags, \
3662 #define DEF_MDT_HNDL(flags, name, fn, fmt) \
3663 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
3665 #define DEF_SEQ_HNDL(flags, name, fn, fmt) \
3666 DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
3668 #define DEF_FLD_HNDL(flags, name, fn, fmt) \
3669 DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
3671 * Request with a format known in advance
3673 #define DEF_MDT_HNDL_F(flags, name, fn) \
3674 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
3676 #define DEF_SEQ_HNDL_F(flags, name, fn) \
3677 DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
3679 #define DEF_FLD_HNDL_F(flags, name, fn) \
3680 DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_FLD_ ## name)
3682 * Request with a format we do not yet know
3684 #define DEF_MDT_HNDL_0(flags, name, fn) \
3685 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
3687 static struct mdt_handler mdt_mds_ops[] = {
3688 DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
3689 DEF_MDT_HNDL_F(0, DISCONNECT, mdt_disconnect),
3690 DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus),
3691 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR, mdt_getattr),
3692 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
3693 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO|MUTABOR,
3694 SETXATTR, mdt_setxattr),
3695 DEF_MDT_HNDL_F(HABEO_CORPUS, GETXATTR, mdt_getxattr),
3696 DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs),
3697 DEF_MDT_HNDL_F(0 |MUTABOR,
3699 DEF_MDT_HNDL_F(HABEO_CORPUS , CLOSE, mdt_close),
3700 DEF_MDT_HNDL_F(HABEO_CORPUS , DONE_WRITING, mdt_done_writing),
3701 DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin),
3702 DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
3703 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR, mdt_is_subdir),
3704 DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle),
3705 DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle)
3708 #define DEF_OBD_HNDL(flags, name, fn) \
3709 DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
3712 static struct mdt_handler mdt_obd_ops[] = {
3713 DEF_OBD_HNDL(0, PING, mdt_obd_ping),
3714 DEF_OBD_HNDL(0, LOG_CANCEL, mdt_obd_log_cancel),
3715 DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback)
3718 #define DEF_DLM_HNDL_0(flags, name, fn) \
3719 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
3720 #define DEF_DLM_HNDL_F(flags, name, fn) \
3721 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
3723 static struct mdt_handler mdt_dlm_ops[] = {
3724 DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
3725 DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT, mdt_convert),
3726 DEF_DLM_HNDL_0(0, BL_CALLBACK, mdt_bl_callback),
3727 DEF_DLM_HNDL_0(0, CP_CALLBACK, mdt_cp_callback)
3730 static struct mdt_handler mdt_llog_ops[] = {
3733 #define DEF_SEC_CTX_HNDL(name, fn) \
3734 DEF_HNDL(SEC_CTX, INIT, _NET, 0, name, fn, NULL)
3736 static struct mdt_handler mdt_sec_ctx_ops[] = {
3737 DEF_SEC_CTX_HNDL(INIT, mdt_sec_ctx_handle),
3738 DEF_SEC_CTX_HNDL(INIT_CONT, mdt_sec_ctx_handle),
3739 DEF_SEC_CTX_HNDL(FINI, mdt_sec_ctx_handle)
3742 static struct mdt_opc_slice mdt_regular_handlers[] = {
3744 .mos_opc_start = MDS_GETATTR,
3745 .mos_opc_end = MDS_LAST_OPC,
3746 .mos_hs = mdt_mds_ops
3749 .mos_opc_start = OBD_PING,
3750 .mos_opc_end = OBD_LAST_OPC,
3751 .mos_hs = mdt_obd_ops
3754 .mos_opc_start = LDLM_ENQUEUE,
3755 .mos_opc_end = LDLM_LAST_OPC,
3756 .mos_hs = mdt_dlm_ops
3759 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
3760 .mos_opc_end = LLOG_LAST_OPC,
3761 .mos_hs = mdt_llog_ops
3764 .mos_opc_start = SEC_CTX_INIT,
3765 .mos_opc_end = SEC_LAST_OPC,
3766 .mos_hs = mdt_sec_ctx_ops
3773 static struct mdt_handler mdt_readpage_ops[] = {
3774 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
3775 #ifdef HAVE_SPLIT_SUPPORT
3776 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
3780 * XXX: this is ugly and should be fixed one day, see mdc_close() for
3781 * detailed comments. --umka
3783 DEF_MDT_HNDL_F(HABEO_CORPUS, CLOSE, mdt_close),
3784 DEF_MDT_HNDL_F(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
3787 static struct mdt_opc_slice mdt_readpage_handlers[] = {
3789 .mos_opc_start = MDS_GETATTR,
3790 .mos_opc_end = MDS_LAST_OPC,
3791 .mos_hs = mdt_readpage_ops
3798 static struct mdt_handler mdt_seq_ops[] = {
3799 DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
3802 static struct mdt_opc_slice mdt_seq_handlers[] = {
3804 .mos_opc_start = SEQ_QUERY,
3805 .mos_opc_end = SEQ_LAST_OPC,
3806 .mos_hs = mdt_seq_ops
3813 static struct mdt_handler mdt_fld_ops[] = {
3814 DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
3817 static struct mdt_opc_slice mdt_fld_handlers[] = {
3819 .mos_opc_start = FLD_QUERY,
3820 .mos_opc_end = FLD_LAST_OPC,
3821 .mos_hs = mdt_fld_ops
3828 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3829 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
3830 MODULE_LICENSE("GPL");
3832 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
3833 "number of mdt service threads to start");
3835 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);