1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/mdt/mdt_handler.c
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
13 * Author: Huang Hua <huanghua@clusterfs.com>
15 * This file is part of the Lustre file system, http://www.lustre.org
16 * Lustre is a trademark of Cluster File Systems, Inc.
18 * You may have signed or agreed to another license before downloading
19 * this software. If so, you are bound by the terms and conditions
20 * of that agreement, and the following does not apply to you. See the
21 * LICENSE file included with this distribution for more information.
23 * If you did not agree to a different license, then this copy of Lustre
24 * is open source software; you can redistribute it and/or modify it
25 * under the terms of version 2 of the GNU General Public License as
26 * published by the Free Software Foundation.
28 * In either case, Lustre is distributed in the hope that it will be
29 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * license text for more details.
35 # define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_MDS
39 #include <linux/module.h>
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
44 * struct OBD_{ALLOC,FREE}*()
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include <lustre_mdt.h>
58 #include "mdt_internal.h"
59 #include <linux/lustre_acl.h>
60 #include <lustre_param.h>
62 * Initialized in mdt_mod_init().
64 unsigned long mdt_num_threads;
66 /* ptlrpc request handler for MDT. All handlers are
67 * grouped into several slices - struct mdt_opc_slice,
68 * and stored in an array - mdt_handlers[].
71 /* The name of this handler. */
73 /* Fail id for this handler, checked at the beginning of this handler*/
75 /* Operation code for this handler */
77 /* flags are listed in enum mdt_handler_flags below. */
79 /* The actual handler function to execute. */
80 int (*mh_act)(struct mdt_thread_info *info);
81 /* Request format for this request. */
82 const struct req_format *mh_fmt;
85 enum mdt_handler_flags {
87 * struct mdt_body is passed in the incoming message, and object
88 * identified by this fid exists on disk.
90 * "habeo corpus" == "I have a body"
92 HABEO_CORPUS = (1 << 0),
94 * struct ldlm_request is passed in the incoming message.
96 * "habeo clavis" == "I have a key"
98 HABEO_CLAVIS = (1 << 1),
100 * this request has fixed reply format, so that reply message can be
101 * packed by generic code.
103 * "habeo refero" == "I have a reply"
105 HABEO_REFERO = (1 << 2),
107 * this request will modify something, so check whether the filesystem
108 * is readonly or not, then return -EROFS to client asap if necessary.
110 * "mutabor" == "I shall modify"
115 struct mdt_opc_slice {
118 struct mdt_handler *mos_hs;
121 static struct mdt_opc_slice mdt_regular_handlers[];
122 static struct mdt_opc_slice mdt_readpage_handlers[];
123 static struct mdt_opc_slice mdt_seq_handlers[];
124 static struct mdt_opc_slice mdt_fld_handlers[];
126 static struct mdt_device *mdt_dev(struct lu_device *d);
127 static int mdt_regular_handle(struct ptlrpc_request *req);
128 static int mdt_recovery_handle(struct ptlrpc_request *req);
129 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
131 static struct lu_object_operations mdt_obj_ops;
133 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
137 return (rep->lock_policy_res1 & flag);
140 void mdt_clear_disposition(struct mdt_thread_info *info,
141 struct ldlm_reply *rep, int flag)
144 info->mti_opdata &= ~flag;
146 rep->lock_policy_res1 &= ~flag;
149 void mdt_set_disposition(struct mdt_thread_info *info,
150 struct ldlm_reply *rep, int flag)
153 info->mti_opdata |= flag;
155 rep->lock_policy_res1 |= flag;
158 static int mdt_getstatus(struct mdt_thread_info *info)
160 struct md_device *next = info->mti_mdt->mdt_child;
162 struct mdt_body *body;
166 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
169 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
170 rc = next->md_ops->mdo_root_get(info->mti_env, next,
173 body->valid |= OBD_MD_FLID;
179 static int mdt_statfs(struct mdt_thread_info *info)
181 struct md_device *next = info->mti_mdt->mdt_child;
182 struct obd_statfs *osfs;
187 /* This will trigger a watchdog timeout */
188 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
189 (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
192 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
195 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
196 /* XXX max_age optimisation is needed here. See mds_statfs */
197 rc = next->md_ops->mdo_statfs(info->mti_env, next,
199 statfs_pack(osfs, &info->mti_u.ksfs);
205 void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr,
206 struct mdt_object *o)
208 /* Check if Size-on-MDS is enabled. */
209 if (S_ISREG(attr->la_mode) && mdt_sizeonmds_enabled(o)) {
210 b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
211 b->size = attr->la_size;
212 b->blocks = attr->la_blocks;
216 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
217 const struct lu_fid *fid)
219 /*XXX should pack the reply body according to lu_valid*/
220 b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
221 OBD_MD_FLGID | OBD_MD_FLTYPE |
222 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
223 OBD_MD_FLATIME | OBD_MD_FLMTIME ;
225 if (!S_ISREG(attr->la_mode))
226 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
228 b->atime = attr->la_atime;
229 b->mtime = attr->la_mtime;
230 b->ctime = attr->la_ctime;
231 b->mode = attr->la_mode;
232 b->size = attr->la_size;
233 b->blocks = attr->la_blocks;
234 b->uid = attr->la_uid;
235 b->gid = attr->la_gid;
236 b->flags = attr->la_flags;
237 b->nlink = attr->la_nlink;
238 b->rdev = attr->la_rdev;
242 b->valid |= OBD_MD_FLID;
243 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
244 PFID(fid), b->nlink, b->mode, b->size);
248 static inline int mdt_body_has_lov(const struct lu_attr *la,
249 const struct mdt_body *body)
251 return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
252 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
255 static int mdt_getattr_internal(struct mdt_thread_info *info,
256 struct mdt_object *o)
258 struct md_object *next = mdt_object_child(o);
259 struct mdt_device *mdt = info->mti_mdt;
260 const struct mdt_body *reqbody = info->mti_body;
261 struct ptlrpc_request *req = mdt_info_req(info);
262 struct md_attr *ma = &info->mti_attr;
263 struct lu_attr *la = &ma->ma_attr;
264 struct req_capsule *pill = &info->mti_pill;
265 const struct lu_env *env = info->mti_env;
266 struct mdt_body *repbody;
267 struct lu_buf *buffer = &info->mti_buf;
271 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
274 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
275 repbody->eadatasize = 0;
276 repbody->aclsize = 0;
278 if (reqbody->valid & OBD_MD_MEA) {
279 /* Assumption: MDT_MD size is enough for lmv size FIXME */
280 ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
281 ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD,
283 ma->ma_need = MA_INODE | MA_LMV;
285 ma->ma_need = MA_INODE | MA_LOV ;
286 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
287 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
290 rc = mo_attr_get(env, next, ma);
291 if (rc == -EREMOTE) {
292 /* This object is located on remote node.*/
293 repbody->fid1 = *mdt_object_fid(o);
294 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
297 CERROR("getattr error for "DFID": %d\n",
298 PFID(mdt_object_fid(o)), rc);
302 if (ma->ma_valid & MA_INODE) {
303 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
304 mdt_body_reverse_idmap(info, repbody);
309 if (mdt_body_has_lov(la, reqbody)) {
310 if (ma->ma_valid & MA_LOV) {
311 LASSERT(ma->ma_lmm_size);
312 mdt_dump_lmm(D_INFO, ma->ma_lmm);
313 repbody->eadatasize = ma->ma_lmm_size;
314 if (S_ISDIR(la->la_mode))
315 repbody->valid |= OBD_MD_FLDIREA;
317 repbody->valid |= OBD_MD_FLEASIZE;
319 if (ma->ma_valid & MA_LMV) {
320 LASSERT(S_ISDIR(la->la_mode));
321 repbody->eadatasize = ma->ma_lmv_size;
322 repbody->valid |= OBD_MD_FLDIREA;
323 repbody->valid |= OBD_MD_MEA;
325 } else if (S_ISLNK(la->la_mode) &&
326 reqbody->valid & OBD_MD_LINKNAME) {
327 buffer->lb_buf = ma->ma_lmm;
328 buffer->lb_len = ma->ma_lmm_size;
329 rc = mo_readlink(env, next, buffer);
331 CERROR("readlink failed: %d\n", rc);
334 repbody->valid |= OBD_MD_LINKNAME;
335 repbody->eadatasize = rc + 1;
336 ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
337 CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
338 (char*)ma->ma_lmm, rc);
343 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
344 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
345 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
346 repbody->valid |= OBD_MD_FLMODEASIZE;
347 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
348 "MAX_COOKIE to : %d:%d\n",
350 repbody->max_cookiesize);
353 if (reqbody->valid & OBD_MD_FLRMTPERM) {
354 buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
355 /* mdt_getattr_lock only */
356 rc = mdt_pack_remote_perm(info, o, buffer);
359 repbody->valid |= OBD_MD_FLRMTPERM;
360 repbody->aclsize = sizeof(struct mdt_remote_perm);
362 #ifdef CONFIG_FS_POSIX_ACL
363 else if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
364 (reqbody->valid & OBD_MD_FLACL)) {
365 buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
366 buffer->lb_len = req_capsule_get_size(pill,
367 &RMF_ACL, RCL_SERVER);
368 if (buffer->lb_len > 0) {
369 rc = mo_xattr_get(env, next, buffer,
370 XATTR_NAME_ACL_ACCESS);
372 if (rc == -ENODATA || rc == -EOPNOTSUPP)
375 CERROR("got acl size: %d\n", rc);
377 repbody->aclsize = rc;
378 repbody->valid |= OBD_MD_FLACL;
384 if ((reqbody->valid & OBD_MD_FLMDSCAPA) && mdt->mdt_opts.mo_mds_capa) {
385 struct lustre_capa *capa;
387 spin_lock(&capa_lock);
388 info->mti_capa_key = *red_capa_key(mdt);
389 spin_unlock(&capa_lock);
391 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
393 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
394 rc = mo_capa_get(env, next, capa);
397 repbody->valid |= OBD_MD_FLMDSCAPA;
403 static int mdt_getattr(struct mdt_thread_info *info)
406 struct mdt_object *obj;
407 struct mdt_body *reqbody;
409 obj = info->mti_object;
410 LASSERT(obj != NULL);
411 LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
414 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
418 if (reqbody->valid & OBD_MD_FLRMTPERM) {
419 rc = mdt_init_ucred(info, reqbody);
424 rc = mdt_getattr_internal(info, obj);
425 mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
426 if (reqbody->valid & OBD_MD_FLRMTPERM)
427 mdt_exit_ucred(info);
431 static int mdt_is_subdir(struct mdt_thread_info *info)
433 struct mdt_object *obj = info->mti_object;
434 struct req_capsule *pill = &info->mti_pill;
435 struct mdt_body *repbody;
438 obj = info->mti_object;
439 LASSERT(obj != NULL);
440 LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
443 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
446 * We save last checked parent fid to @repbody->fid1 for remote
449 LASSERT(fid_is_sane(&info->mti_body->fid2));
450 rc = mdo_is_subdir(info->mti_env, mdt_object_child(obj),
451 &info->mti_body->fid2, &repbody->fid1);
456 * Save error code to ->mode. Later it it is used for detecting the case
460 repbody->valid = OBD_MD_FLMODE;
463 repbody->valid |= OBD_MD_FLID;
470 * UPDATE lock should be taken against parent, and be release before exit;
471 * child_bits lock should be taken against child, and be returned back:
472 * (1)normal request should release the child lock;
473 * (2)intent request will grant the lock to client.
475 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
476 struct mdt_lock_handle *lhc,
478 struct ldlm_reply *ldlm_rep)
480 struct ptlrpc_request *req = mdt_info_req(info);
481 struct mdt_object *parent = info->mti_object;
482 struct mdt_object *child;
483 struct md_object *next = mdt_object_child(info->mti_object);
484 struct lu_fid *child_fid = &info->mti_tmp_fid1;
487 struct mdt_lock_handle *lhp;
488 struct ldlm_lock *lock;
491 is_resent = lustre_handle_is_used(&lhc->mlh_lh);
493 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
495 LASSERT(info->mti_object != NULL);
496 name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
500 CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
501 PFID(mdt_object_fid(parent)), name, ldlm_rep);
503 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
504 if (strlen(name) == 0) {
505 /* only getattr on the child. parent is on another node. */
506 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
508 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
509 ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
512 /* Do not take lock for resent case. */
513 lock = ldlm_handle2lock(&lhc->mlh_lh);
515 CERROR("Invalid lock handle "LPX64"\n",
519 LASSERT(fid_res_name_eq(mdt_object_fid(child),
520 &lock->l_resource->lr_name));
524 mdt_lock_handle_init(lhc);
525 lhc->mlh_mode = LCK_CR;
528 * Object's name is on another MDS, no lookup lock is
529 * needed here but update is.
531 child_bits &= ~MDS_INODELOCK_LOOKUP;
532 child_bits |= MDS_INODELOCK_UPDATE;
533 rc = mdt_object_lock(info, child, lhc, child_bits);
536 /* Finally, we can get attr for child. */
537 rc = mdt_getattr_internal(info, child);
539 mdt_object_unlock(info, child, lhc, 1);
544 /*step 1: lock parent */
545 lhp = &info->mti_lh[MDT_LH_PARENT];
546 lhp->mlh_mode = LCK_CR;
547 rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
551 /*step 2: lookup child's fid by name */
552 rc = mdo_lookup(info->mti_env, next, name, child_fid);
555 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
556 GOTO(out_parent, rc);
558 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
560 *step 3: find the child object by fid & lock it.
561 * regardless if it is local or remote.
563 child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
566 GOTO(out_parent, rc = PTR_ERR(child));
568 /* Do not take lock for resent case. */
569 lock = ldlm_handle2lock(&lhc->mlh_lh);
571 CERROR("Invalid lock handle "LPX64"\n",
575 LASSERT(fid_res_name_eq(child_fid,
576 &lock->l_resource->lr_name));
579 mdt_lock_handle_init(lhc);
580 lhc->mlh_mode = LCK_CR;
581 rc = mdt_object_cr_lock(info, child, lhc, child_bits);
586 /* finally, we can get attr for child. */
587 rc = mdt_getattr_internal(info, child);
589 mdt_object_unlock(info, child, lhc, 1);
591 struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
593 struct ldlm_res_id *res_id;
594 struct mdt_body *repbody;
597 /* Debugging code. */
598 res_id = &lock->l_resource->lr_name;
599 LDLM_DEBUG(lock, "we will return this lock client\n");
600 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
601 &lock->l_resource->lr_name),
602 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
603 (unsigned long)res_id->name[0],
604 (unsigned long)res_id->name[1],
605 (unsigned long)res_id->name[2],
606 PFID(mdt_object_fid(child)));
608 /* Pack Size-on-MDS inode attributes to the body if
609 * update lock is given. */
610 repbody = req_capsule_server_get(&info->mti_pill,
612 ma = &info->mti_attr.ma_attr;
613 if (lock->l_policy_data.l_inodebits.bits &
614 MDS_INODELOCK_UPDATE)
615 mdt_pack_size2body(repbody, ma, child);
623 mdt_object_put(info->mti_env, child);
625 mdt_object_unlock(info, parent, lhp, 1);
630 /* normal handler: should release the child lock */
631 static int mdt_getattr_name(struct mdt_thread_info *info)
633 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
634 struct mdt_body *reqbody;
639 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
643 rc = mdt_init_ucred(info, reqbody);
647 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
648 if (lustre_handle_is_used(&lhc->mlh_lh)) {
649 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
650 lhc->mlh_lh.cookie = 0;
652 mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
653 mdt_exit_ucred(info);
657 static struct lu_device_operations mdt_lu_ops;
659 static int lu_device_is_mdt(struct lu_device *d)
661 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
664 static inline struct mdt_device *mdt_dev(struct lu_device *d)
666 LASSERT(lu_device_is_mdt(d));
667 return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
670 static int mdt_connect(struct mdt_thread_info *info)
673 struct ptlrpc_request *req;
675 req = mdt_info_req(info);
676 rc = target_handle_connect(req, mdt_recovery_handle);
678 LASSERT(req->rq_export != NULL);
679 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
680 rc = mdt_init_idmap(info);
685 static int mdt_disconnect(struct mdt_thread_info *info)
687 return target_handle_disconnect(mdt_info_req(info));
690 static int mdt_sendpage(struct mdt_thread_info *info,
691 struct lu_rdpg *rdpg)
693 struct ptlrpc_request *req = mdt_info_req(info);
694 struct ptlrpc_bulk_desc *desc;
695 struct l_wait_info *lwi = &info->mti_u.rdpg.mti_wait_info;
702 desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
705 GOTO(out, rc = -ENOMEM);
707 for (i = 0, tmpcount = rdpg->rp_count;
708 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
709 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
710 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
713 LASSERT(desc->bd_nob == rdpg->rp_count);
714 rc = ptlrpc_start_bulk_transfer(desc);
718 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
719 GOTO(abort_bulk, rc);
721 *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
722 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
723 LASSERT (rc == 0 || rc == -ETIMEDOUT);
726 if (desc->bd_success &&
727 desc->bd_nob_transferred == rdpg->rp_count)
730 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
733 DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
734 (rc == -ETIMEDOUT) ? "timeout" : "network error",
735 desc->bd_nob_transferred, rdpg->rp_count,
736 req->rq_export->exp_client_uuid.uuid,
737 req->rq_export->exp_connection->c_remote_uuid.uuid);
739 class_fail_export(req->rq_export);
743 ptlrpc_abort_bulk(desc);
745 ptlrpc_free_bulk(desc);
750 #ifdef HAVE_SPLIT_SUPPORT
752 * Retrieve dir entry from the page and insert it to the
753 * slave object, actually, this should be in osd layer,
754 * but since it will not in the final product, so just do
755 * it here and do not define more moo api anymore for
758 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
760 struct mdt_object *object = info->mti_object;
761 struct lu_dirpage *dp;
762 struct lu_dirent *ent;
767 /* Disable trans for this name insert, since it will
768 * include many trans for this */
769 info->mti_no_need_trans = 1;
771 dp = page_address(page);
772 for (ent = lu_dirent_start(dp); ent != NULL;
773 ent = lu_dirent_next(ent)) {
774 struct lu_fid *lf = &ent->lde_fid;
776 /* FIXME: multi-trans for this name insert */
777 if (strncmp(ent->lde_name, ".", ent->lde_namelen) &&
778 strncmp(ent->lde_name, "..", ent->lde_namelen)) {
780 /* FIXME: Here we allocate name for each name,
781 * maybe stupid, but can not find better way.
782 * will find better way */
783 OBD_ALLOC(name, ent->lde_namelen + 1);
784 memcpy(name, ent->lde_name, ent->lde_namelen);
785 rc = mdo_name_insert(info->mti_env,
786 md_object_next(&object->mot_obj),
788 OBD_FREE(name, ent->lde_namelen + 1);
798 static int mdt_bulk_timeout(void *data)
802 CERROR("mdt bulk transfer timeout \n");
807 static int mdt_writepage(struct mdt_thread_info *info)
809 struct ptlrpc_request *req = mdt_info_req(info);
810 struct mdt_body *reqbody;
811 struct l_wait_info *lwi;
812 struct ptlrpc_bulk_desc *desc;
818 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
822 desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
826 /* allocate the page for the desc */
827 page = alloc_pages(GFP_KERNEL, 0);
829 GOTO(desc_cleanup, rc = -ENOMEM);
831 CDEBUG(D_INFO, "Received page offset %d size %d \n",
832 (int)reqbody->size, (int)reqbody->nlink);
834 ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
835 (int)reqbody->nlink);
837 /* FIXME: following parts are copied from ost_brw_write */
839 /* Check if client was evicted while we were doing i/o before touching
843 GOTO(cleanup_page, rc = -ENOMEM);
845 if (desc->bd_export->exp_failed)
848 rc = ptlrpc_start_bulk_transfer (desc);
850 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
851 mdt_bulk_timeout, desc);
852 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
853 desc->bd_export->exp_failed, lwi);
854 LASSERT(rc == 0 || rc == -ETIMEDOUT);
855 if (rc == -ETIMEDOUT) {
856 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
857 ptlrpc_abort_bulk(desc);
858 } else if (desc->bd_export->exp_failed) {
859 DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
861 ptlrpc_abort_bulk(desc);
862 } else if (!desc->bd_success ||
863 desc->bd_nob_transferred != desc->bd_nob) {
864 DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
866 "truncated" : "network error on",
867 desc->bd_nob_transferred, desc->bd_nob);
868 /* XXX should this be a different errno? */
872 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
875 GOTO(cleanup_lwi, rc);
876 rc = mdt_write_dir_page(info, page);
881 __free_pages(page, 0);
883 ptlrpc_free_bulk(desc);
888 static int mdt_readpage(struct mdt_thread_info *info)
890 struct mdt_object *object = info->mti_object;
891 struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
892 struct mdt_body *reqbody;
893 struct mdt_body *repbody;
898 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
901 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
902 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
903 if (reqbody == NULL || repbody == NULL)
906 rc = mdt_init_ucred(info, reqbody);
911 * prepare @rdpg before calling lower layers and transfer itself. Here
912 * reqbody->size contains offset of where to start to read and
913 * reqbody->nlink contains number bytes to read.
915 rdpg->rp_hash = reqbody->size;
916 if ((__u64)rdpg->rp_hash != reqbody->size) {
917 CERROR("Invalid hash: %#llx != %#llx\n",
918 (__u64)rdpg->rp_hash, reqbody->size);
919 GOTO(out, rc = -EFAULT);
921 rdpg->rp_count = reqbody->nlink;
922 rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
923 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
924 if (rdpg->rp_pages == NULL)
925 GOTO(out, rc = -ENOMEM);
927 for (i = 0; i < rdpg->rp_npages; ++i) {
928 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
929 if (rdpg->rp_pages[i] == NULL)
930 GOTO(free_rdpg, rc = -ENOMEM);
933 /* call lower layers to fill allocated pages with directory data */
934 rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
942 /* send pages to client */
943 rc = mdt_sendpage(info, rdpg);
948 for (i = 0; i < rdpg->rp_npages; i++)
949 if (rdpg->rp_pages[i] != NULL)
950 __free_pages(rdpg->rp_pages[i], 0);
951 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
953 mdt_exit_ucred(info);
954 MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
957 mdt_exit_ucred(info);
958 return rc ? rc : rc1;
961 static int mdt_reint_internal(struct mdt_thread_info *info,
962 struct mdt_lock_handle *lhc,
965 struct req_capsule *pill = &info->mti_pill;
966 struct mdt_device *mdt = info->mti_mdt;
967 struct ptlrpc_request *req = mdt_info_req(info);
972 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
973 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
974 mdt->mdt_max_mdsize);
975 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
976 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
977 mdt->mdt_max_cookiesize);
978 rc = req_capsule_pack(pill);
980 CERROR("Can't pack response, rc %d\n", rc);
985 * Check this after packing response, because after we fail here without
986 * allocating response, caller anyway may want to get ldlm_reply from it
989 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
992 rc = mdt_reint_unpack(info, op);
994 CERROR("Can't unpack reint, rc %d\n", rc);
998 rc = mdt_init_ucred_reint(info);
1002 rc = mdt_fix_attr_ucred(info, op);
1006 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1007 struct mdt_client_data *mcd;
1009 mcd = req->rq_export->exp_mdt_data.med_mcd;
1010 if (mcd->mcd_last_xid == req->rq_xid ||
1011 mcd->mcd_last_close_xid == req->rq_xid) {
1012 mdt_reconstruct(info, lhc);
1013 rc = lustre_msg_get_status(req->rq_repmsg);
1016 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
1019 rc = mdt_reint_rec(info, lhc);
1022 mdt_exit_ucred(info);
1026 static long mdt_reint_opcode(struct mdt_thread_info *info,
1027 const struct req_format **fmt)
1033 ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
1036 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
1037 if (opc < REINT_MAX && fmt[opc] != NULL)
1038 req_capsule_extend(&info->mti_pill, fmt[opc]);
1040 CERROR("Unsupported opc: %ld\n", opc);
1045 static int mdt_reint(struct mdt_thread_info *info)
1050 static const struct req_format *reint_fmts[REINT_MAX] = {
1051 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
1052 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
1053 [REINT_LINK] = &RQF_MDS_REINT_LINK,
1054 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
1055 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
1056 [REINT_OPEN] = &RQF_MDS_REINT_OPEN
1061 opc = mdt_reint_opcode(info, reint_fmts);
1064 * No lock possible here from client to pass it to reint code
1067 rc = mdt_reint_internal(info, NULL, opc);
1072 info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
1076 /* TODO these two methods not available now. */
1078 /* this should sync the whole device */
1079 static int mdt_device_sync(struct mdt_thread_info *info)
1084 /* this should sync this object */
1085 static int mdt_object_sync(struct mdt_thread_info *info)
1090 static int mdt_sync(struct mdt_thread_info *info)
1092 struct req_capsule *pill = &info->mti_pill;
1093 struct mdt_body *body;
1097 /* The fid may be zero, so we req_capsule_set manually */
1098 req_capsule_set(pill, &RQF_MDS_SYNC);
1100 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1104 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1107 if (fid_seq(&body->fid1) == 0) {
1108 /* sync the whole device */
1109 rc = req_capsule_pack(pill);
1111 rc = mdt_device_sync(info);
1113 /* sync an object */
1114 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
1116 rc = mdt_object_sync(info);
1118 struct md_object *next;
1119 const struct lu_fid *fid;
1120 struct lu_attr *la = &info->mti_attr.ma_attr;
1122 next = mdt_object_child(info->mti_object);
1123 info->mti_attr.ma_need = MA_INODE;
1124 rc = mo_attr_get(info->mti_env, next,
1127 body = req_capsule_server_get(pill,
1129 fid = mdt_object_fid(info->mti_object);
1130 mdt_pack_attr2body(body, la, fid);
1131 mdt_body_reverse_idmap(info, body);
1139 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
1144 static int mdt_quotactl_handle(struct mdt_thread_info *info)
1149 static int mdt_renew_capa(struct mdt_thread_info *info)
1151 struct mdt_device *mdt = info->mti_mdt;
1152 struct mdt_object *obj = info->mti_object;
1153 struct mdt_body *body;
1154 struct lustre_capa *capa;
1158 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
1161 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
1164 spin_lock(&capa_lock);
1165 info->mti_capa_key = *red_capa_key(mdt);
1166 spin_unlock(&capa_lock);
1168 *capa = obj->mot_header.loh_capa;
1169 /* TODO: add capa check */
1170 rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa);
1178 * OBD PING and other handlers.
1180 static int mdt_obd_ping(struct mdt_thread_info *info)
1184 rc = target_handle_ping(mdt_info_req(info));
1188 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
1193 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
1203 static struct ldlm_callback_suite cbs = {
1204 .lcs_completion = ldlm_server_completion_ast,
1205 .lcs_blocking = ldlm_server_blocking_ast,
1209 static int mdt_enqueue(struct mdt_thread_info *info)
1211 struct ptlrpc_request *req;
1215 * info->mti_dlm_req already contains swapped and (if necessary)
1216 * converted dlm request.
1218 LASSERT(info->mti_dlm_req != NULL);
1220 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) {
1221 info->mti_fail_id = OBD_FAIL_LDLM_ENQUEUE;
1225 req = mdt_info_req(info);
1226 rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
1227 req, info->mti_dlm_req, &cbs);
1228 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
1229 return rc ? : req->rq_status;
1232 static int mdt_convert(struct mdt_thread_info *info)
1235 struct ptlrpc_request *req;
1237 LASSERT(info->mti_dlm_req);
1238 req = mdt_info_req(info);
1239 rc = ldlm_handle_convert0(req, info->mti_dlm_req);
1240 return rc ? : req->rq_status;
1243 static int mdt_bl_callback(struct mdt_thread_info *info)
1245 CERROR("bl callbacks should not happen on MDS\n");
1250 static int mdt_cp_callback(struct mdt_thread_info *info)
1252 CERROR("cp callbacks should not happen on MDS\n");
1258 * sec context handlers
1260 static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
1262 return mdt_handle_idmap(info);
1265 static struct mdt_object *mdt_obj(struct lu_object *o)
1267 LASSERT(lu_device_is_mdt(o->lo_dev));
1268 return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1271 struct mdt_object *mdt_object_find(const struct lu_env *env,
1272 struct mdt_device *d,
1273 const struct lu_fid *f,
1274 struct lustre_capa *c)
1276 struct lu_object *o;
1277 struct mdt_object *m;
1280 if (!d->mdt_opts.mo_mds_capa)
1283 o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f, c);
1285 m = (struct mdt_object *)o;
1291 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1292 struct mdt_lock_handle *lh, __u64 ibits)
1294 ldlm_policy_data_t *policy = &info->mti_policy;
1295 struct ldlm_res_id *res_id = &info->mti_res_id;
1296 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1300 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1301 LASSERT(lh->mlh_mode != LCK_MINMODE);
1302 if (lu_object_exists(&o->mot_obj.mo_lu) < 0) {
1303 LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
1304 LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1306 policy->l_inodebits.bits = ibits;
1308 rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
1313 /* lock with cross-ref fixes */
1314 int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o,
1315 struct mdt_lock_handle *lh, __u64 ibits)
1317 if (lu_object_exists(&o->mot_obj.mo_lu) < 0) {
1318 /* cross-ref object fix */
1319 ibits &= ~MDS_INODELOCK_UPDATE;
1320 ibits |= MDS_INODELOCK_LOOKUP;
1322 return mdt_object_lock(info, o, lh, ibits);
1326 * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
1327 * to save this lock in req. when transaction committed, req will be released,
1328 * and lock will, too.
1330 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1331 struct mdt_lock_handle *lh, int decref)
1333 struct ptlrpc_request *req = mdt_info_req(info);
1334 struct lustre_handle *handle = &lh->mlh_lh;
1335 ldlm_mode_t mode = lh->mlh_mode;
1338 if (lustre_handle_is_used(handle)) {
1340 fid_unlock(mdt_object_fid(o), handle, mode);
1342 ptlrpc_save_lock(req, handle, mode);
1348 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1349 const struct lu_fid *f,
1350 struct mdt_lock_handle *lh,
1352 struct lustre_capa *capa)
1354 struct mdt_object *o;
1356 o = mdt_object_find(info->mti_env, info->mti_mdt, f, capa);
1360 rc = mdt_object_lock(info, o, lh, ibits);
1362 mdt_object_put(info->mti_env, o);
1369 void mdt_object_unlock_put(struct mdt_thread_info * info,
1370 struct mdt_object * o,
1371 struct mdt_lock_handle *lh,
1374 mdt_object_unlock(info, o, lh, decref);
1375 mdt_object_put(info->mti_env, o);
1378 static struct mdt_handler *mdt_handler_find(__u32 opc,
1379 struct mdt_opc_slice *supported)
1381 struct mdt_opc_slice *s;
1382 struct mdt_handler *h;
1385 for (s = supported; s->mos_hs != NULL; s++) {
1386 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1387 h = s->mos_hs + (opc - s->mos_opc_start);
1389 LASSERT(h->mh_opc == opc);
1391 h = NULL; /* unsupported opc */
1398 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1400 return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1403 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1405 return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1408 static int mdt_lock_resname_compat(struct mdt_device *m,
1409 struct ldlm_request *req)
1411 /* XXX something... later. */
1415 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1417 /* XXX something... later. */
1422 * Generic code handling requests that have struct mdt_body passed in:
1424 * - extract mdt_body from request and save it in @info, if present;
1426 * - create lu_object, corresponding to the fid in mdt_body, and save it in
1429 * - if HABEO_CORPUS flag is set for this request type check whether object
1430 * actually exists on storage (lu_object_exists()).
1433 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1435 struct lustre_capa *capa = NULL;
1436 const struct mdt_body *body;
1437 struct mdt_object *obj;
1438 const struct lu_env *env;
1439 struct req_capsule *pill;
1442 env = info->mti_env;
1443 pill = &info->mti_pill;
1445 body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1449 if (!fid_is_sane(&body->fid1)) {
1450 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1455 * Dot not get size or any capa fields before we check that request
1456 * contains capa actually. There are some requests which do not, for
1457 * instance MDS_IS_SUBDIR.
1459 if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT))
1460 capa = req_capsule_client_get(pill, &RMF_CAPA1);
1462 obj = mdt_object_find(env, info->mti_mdt, &body->fid1, capa);
1464 if ((flags & HABEO_CORPUS) &&
1465 !lu_object_exists(&obj->mot_obj.mo_lu)) {
1466 mdt_object_put(env, obj);
1469 info->mti_object = obj;
1478 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1480 struct req_capsule *pill;
1484 pill = &info->mti_pill;
1486 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1487 rc = mdt_body_unpack(info, flags);
1491 if (rc == 0 && (flags & HABEO_REFERO)) {
1492 struct mdt_device *mdt = info->mti_mdt;
1494 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1495 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1496 mdt->mdt_max_mdsize);
1497 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1498 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1499 mdt->mdt_max_cookiesize);
1501 rc = req_capsule_pack(pill);
1507 struct lu_context_key mdt_txn_key;
1508 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1510 struct mdt_device *mdt = info->mti_mdt;
1511 struct ptlrpc_request *req = mdt_info_req(info);
1512 struct obd_export *exp = req->rq_export;
1514 /* sometimes the reply message has not been successfully packed */
1515 if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1518 if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1521 /*XXX: assert on this when all code will be finished */
1522 if (rc != 0 && info->mti_transno != 0) {
1523 info->mti_transno = 0;
1524 CERROR("Transno is not 0 while rc is %i!\n", rc);
1527 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1528 info->mti_transno, exp->exp_obd->obd_last_committed);
1530 spin_lock(&mdt->mdt_transno_lock);
1531 req->rq_transno = info->mti_transno;
1532 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1534 target_committed_to_req(req);
1536 spin_unlock(&mdt->mdt_transno_lock);
1537 lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1538 //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1544 * Invoke handler for this request opc. Also do necessary preprocessing
1545 * (according to handler ->mh_flags), and post-processing (setting of
1546 * ->last_{xid,committed}).
1548 static int mdt_req_handle(struct mdt_thread_info *info,
1549 struct mdt_handler *h, struct ptlrpc_request *req)
1556 LASSERT(h->mh_act != NULL);
1557 LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1558 LASSERT(current->journal_info == NULL);
1560 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1563 * Do not use *_FAIL_CHECK_ONCE() macros, because they will stop
1564 * correct handling of failed req later in ldlm due to doing
1565 * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
1566 * correct actions like it is done in target_send_reply_msg().
1568 if (h->mh_fail_id != 0) {
1570 * Set to info->mti_fail_id to handler fail_id, it will be used
1571 * later, and better than use default fail_id.
1573 if (OBD_FAIL_CHECK(h->mh_fail_id)) {
1574 info->mti_fail_id = h->mh_fail_id;
1580 flags = h->mh_flags;
1581 LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1583 if (h->mh_fmt != NULL) {
1584 req_capsule_set(&info->mti_pill, h->mh_fmt);
1585 rc = mdt_unpack_req_pack_rep(info, flags);
1588 if (rc == 0 && flags & MUTABOR &&
1589 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1592 if (rc == 0 && flags & HABEO_CLAVIS) {
1593 struct ldlm_request *dlm_req;
1595 LASSERT(h->mh_fmt != NULL);
1597 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1598 if (dlm_req != NULL) {
1599 if (info->mti_mdt->mdt_opts.mo_compat_resname)
1600 rc = mdt_lock_resname_compat(info->mti_mdt,
1602 info->mti_dlm_req = dlm_req;
1604 CERROR("Can't unpack dlm request\n");
1613 rc = h->mh_act(info);
1615 req->rq_status = rc;
1618 * It is not correct to zero @rc out here unconditionally. First of all,
1619 * for error cases, we do not need target_committed_to_req(req). Second
1620 * reason is that, @rc is passed to target_send_reply() and used for
1621 * figuring out what should be done about reply in capricular case. We
1622 * only zero it out for ELDLM_* codes which > 0 because they do not
1623 * support invariant of marking req as difficult only in case of error.
1628 LASSERT(current->journal_info == NULL);
1630 if (rc == 0 && (flags & HABEO_CLAVIS)
1631 && info->mti_mdt->mdt_opts.mo_compat_resname) {
1632 struct ldlm_reply *dlmrep;
1634 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1636 rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1639 /* If we're DISCONNECTing, the mdt_export_data is already freed */
1640 if (rc == 0 && h->mh_opc != MDS_DISCONNECT)
1641 target_committed_to_req(req);
1646 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1648 lh->mlh_lh.cookie = 0ull;
1649 lh->mlh_mode = LCK_MINMODE;
1652 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1654 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1657 static void mdt_thread_info_init(struct ptlrpc_request *req,
1658 struct mdt_thread_info *info)
1662 memset(info, 0, sizeof(*info));
1664 info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1665 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1666 info->mti_rep_buf_size[i] = -1;
1668 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1669 mdt_lock_handle_init(&info->mti_lh[i]);
1671 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1672 info->mti_env = req->rq_svc_thread->t_env;
1673 info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1675 /* it can be NULL while CONNECT */
1677 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1678 req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1679 info->mti_rep_buf_size);
1682 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1686 req_capsule_fini(&info->mti_pill);
1687 if (info->mti_object != NULL) {
1688 mdt_object_put(info->mti_env, info->mti_object);
1689 info->mti_object = NULL;
1691 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1692 mdt_lock_handle_fini(&info->mti_lh[i]);
1696 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1697 struct obd_device *obd, int *process);
1699 * Handle recovery. Return:
1700 * +1: continue request processing;
1701 * -ve: abort immediately with the given error code;
1702 * 0: send reply with error code in req->rq_status;
1704 static int mdt_recovery(struct mdt_thread_info *info)
1706 struct ptlrpc_request *req = mdt_info_req(info);
1709 struct obd_device *obd;
1713 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1716 case SEC_CTX_INIT_CONT:
1718 mdt_handle_idmap(info);
1722 if (req->rq_export == NULL) {
1723 CERROR("operation %d on unconnected MDS from %s\n",
1724 lustre_msg_get_opc(req->rq_reqmsg),
1725 libcfs_id2str(req->rq_peer));
1726 req->rq_status = -ENOTCONN;
1730 /* sanity check: if the xid matches, the request must be marked as a
1731 * resent or replayed */
1732 if (req->rq_xid == req_exp_last_xid(req) ||
1733 req->rq_xid == req_exp_last_close_xid(req)) {
1734 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1735 (MSG_RESENT | MSG_REPLAY))) {
1736 CERROR("rq_xid "LPU64" matches last_xid, "
1737 "expected RESENT flag\n", req->rq_xid);
1738 req->rq_status = -ENOTCONN;
1743 /* else: note the opposite is not always true; a RESENT req after a
1744 * failover will usually not match the last_xid, since it was likely
1745 * never committed. A REPLAYed request will almost never match the
1746 * last xid, however it could for a committed, but still retained,
1749 obd = req->rq_export->exp_obd;
1751 /* Check for aborted recovery... */
1752 spin_lock_bh(&obd->obd_processing_task_lock);
1753 abort_recovery = obd->obd_abort_recovery;
1754 recovering = obd->obd_recovering;
1755 spin_unlock_bh(&obd->obd_processing_task_lock);
1756 if (abort_recovery) {
1757 target_abort_recovery(obd);
1758 } else if (recovering) {
1762 rc = mds_filter_recovery_request(req, obd, &should_process);
1763 if (rc != 0 || !should_process)
1769 static int mdt_reply(struct ptlrpc_request *req, int rc,
1770 struct mdt_thread_info *info)
1772 struct obd_device *obd;
1775 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1776 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1777 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1779 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1780 if (obd && obd->obd_recovering) {
1781 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1782 RETURN(target_queue_final_reply(req, rc));
1785 * Lost a race with recovery; let the error path DTRT.
1787 rc = req->rq_status = -ENOTCONN;
1790 target_send_reply(req, rc, info->mti_fail_id);
1795 extern int mds_msg_check_version(struct lustre_msg *msg);
1797 static int mdt_handle0(struct ptlrpc_request *req,
1798 struct mdt_thread_info *info,
1799 struct mdt_opc_slice *supported)
1801 struct mdt_handler *h;
1802 struct lustre_msg *msg;
1807 MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1809 LASSERT(current->journal_info == NULL);
1811 msg = req->rq_reqmsg;
1812 rc = mds_msg_check_version(msg);
1814 rc = mdt_recovery(info);
1816 h = mdt_handler_find(lustre_msg_get_opc(msg),
1819 rc = mdt_req_handle(info, h, req);
1821 req->rq_status = -ENOTSUPP;
1822 rc = ptlrpc_error(req);
1825 rc = mdt_reply(req, rc, info);
1828 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
1833 * MDT handler function called by ptlrpc service thread when request comes.
1835 * XXX common "target" functionality should be factored into separate module
1836 * shared by mdt, ost and stand-alone services like fld.
1838 static int mdt_handle_common(struct ptlrpc_request *req,
1839 struct mdt_opc_slice *supported)
1842 struct mdt_thread_info *info;
1846 env = req->rq_svc_thread->t_env;
1847 LASSERT(env != NULL);
1848 LASSERT(env->le_ses != NULL);
1849 LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread);
1850 info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
1851 LASSERT(info != NULL);
1853 mdt_thread_info_init(req, info);
1855 rc = mdt_handle0(req, info, supported);
1857 mdt_thread_info_fini(info);
1862 * This is called from recovery code as handler of _all_ RPC types, FLD and SEQ
1865 static int mdt_recovery_handle(struct ptlrpc_request *req)
1870 switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1872 rc = mdt_handle_common(req, mdt_fld_handlers);
1875 rc = mdt_handle_common(req, mdt_seq_handlers);
1878 rc = mdt_handle_common(req, mdt_regular_handlers);
1885 static int mdt_regular_handle(struct ptlrpc_request *req)
1887 return mdt_handle_common(req, mdt_regular_handlers);
1890 static int mdt_readpage_handle(struct ptlrpc_request *req)
1892 return mdt_handle_common(req, mdt_readpage_handlers);
1895 static int mdt_mdsc_handle(struct ptlrpc_request *req)
1897 return mdt_handle_common(req, mdt_seq_handlers);
1900 static int mdt_mdss_handle(struct ptlrpc_request *req)
1902 return mdt_handle_common(req, mdt_seq_handlers);
1905 static int mdt_dtss_handle(struct ptlrpc_request *req)
1907 return mdt_handle_common(req, mdt_seq_handlers);
1910 static int mdt_fld_handle(struct ptlrpc_request *req)
1912 return mdt_handle_common(req, mdt_fld_handlers);
1928 static int mdt_intent_getattr(enum mdt_it_code opcode,
1929 struct mdt_thread_info *info,
1930 struct ldlm_lock **,
1932 static int mdt_intent_reint(enum mdt_it_code opcode,
1933 struct mdt_thread_info *info,
1934 struct ldlm_lock **,
1937 static struct mdt_it_flavor {
1938 const struct req_format *it_fmt;
1940 int (*it_act)(enum mdt_it_code ,
1941 struct mdt_thread_info *,
1942 struct ldlm_lock **,
1945 } mdt_it_flavor[] = {
1947 .it_fmt = &RQF_LDLM_INTENT,
1948 /*.it_flags = HABEO_REFERO,*/
1950 .it_act = mdt_intent_reint,
1951 .it_reint = REINT_OPEN
1954 .it_fmt = &RQF_LDLM_INTENT,
1955 .it_flags = MUTABOR,
1956 .it_act = mdt_intent_reint,
1957 .it_reint = REINT_OPEN
1960 .it_fmt = &RQF_LDLM_INTENT,
1961 .it_flags = MUTABOR,
1962 .it_act = mdt_intent_reint,
1963 .it_reint = REINT_CREATE
1965 [MDT_IT_GETATTR] = {
1966 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1967 .it_flags = HABEO_REFERO,
1968 .it_act = mdt_intent_getattr
1970 [MDT_IT_READDIR] = {
1976 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1977 .it_flags = HABEO_REFERO,
1978 .it_act = mdt_intent_getattr
1981 .it_fmt = &RQF_LDLM_INTENT_UNLINK,
1982 .it_flags = MUTABOR,
1983 .it_act = NULL, /* XXX can be mdt_intent_reint, ? */
1984 .it_reint = REINT_UNLINK
1988 .it_flags = MUTABOR,
1991 [MDT_IT_GETXATTR] = {
1998 int mdt_intent_lock_replace(struct mdt_thread_info *info,
1999 struct ldlm_lock **lockp,
2000 struct ldlm_lock *new_lock,
2001 struct mdt_lock_handle *lh,
2004 struct ptlrpc_request *req = mdt_info_req(info);
2005 struct ldlm_lock *lock = *lockp;
2008 * Get new lock only for cases when possible resent did not find any
2011 if (new_lock == NULL)
2012 new_lock = ldlm_handle2lock(&lh->mlh_lh);
2014 if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2017 LASSERTF(new_lock != NULL,
2018 "lockh "LPX64"\n", lh->mlh_lh.cookie);
2021 * If we've already given this lock to a client once, then we should
2022 * have no readers or writers. Otherwise, we should have one reader
2023 * _or_ writer ref (which will be zeroed below) before returning the
2026 if (new_lock->l_export == req->rq_export) {
2027 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2029 LASSERT(new_lock->l_export == NULL);
2030 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2035 if (new_lock->l_export == req->rq_export) {
2037 * Already gave this to the client, which means that we
2038 * reconstructed a reply.
2040 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2042 RETURN(ELDLM_LOCK_REPLACED);
2045 /* Fixup the lock to be given to the client */
2046 lock_res_and_lock(new_lock);
2047 new_lock->l_readers = 0;
2048 new_lock->l_writers = 0;
2050 new_lock->l_export = class_export_get(req->rq_export);
2051 list_add(&new_lock->l_export_chain,
2052 &new_lock->l_export->exp_ldlm_data.led_held_locks);
2054 new_lock->l_blocking_ast = lock->l_blocking_ast;
2055 new_lock->l_completion_ast = lock->l_completion_ast;
2056 new_lock->l_remote_handle = lock->l_remote_handle;
2057 new_lock->l_flags &= ~LDLM_FL_LOCAL;
2059 unlock_res_and_lock(new_lock);
2060 LDLM_LOCK_PUT(new_lock);
2061 lh->mlh_lh.cookie = 0;
2063 RETURN(ELDLM_LOCK_REPLACED);
2066 static void mdt_fixup_resent(struct req_capsule *pill,
2067 struct ldlm_lock *new_lock,
2068 struct ldlm_lock **old_lock,
2069 struct mdt_lock_handle *lh)
2071 struct ptlrpc_request *req = pill->rc_req;
2072 struct obd_export *exp = req->rq_export;
2073 struct lustre_handle remote_hdl;
2074 struct ldlm_request *dlmreq;
2075 struct list_head *iter;
2077 if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2080 dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
2081 remote_hdl = dlmreq->lock_handle1;
2083 spin_lock(&exp->exp_ldlm_data.led_lock);
2084 list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2085 struct ldlm_lock *lock;
2086 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2087 if (lock == new_lock)
2089 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2090 lh->mlh_lh.cookie = lock->l_handle.h_cookie;
2091 lh->mlh_mode = lock->l_granted_mode;
2093 LDLM_DEBUG(lock, "restoring lock cookie");
2094 DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2097 *old_lock = LDLM_LOCK_GET(lock);
2098 spin_unlock(&exp->exp_ldlm_data.led_lock);
2102 spin_unlock(&exp->exp_ldlm_data.led_lock);
2105 * If the xid matches, then we know this is a resent request, and allow
2106 * it. (It's probably an OPEN, for which we don't send a lock.
2108 if (req->rq_xid == req_exp_last_xid(req))
2111 if (req->rq_xid == req_exp_last_close_xid(req))
2115 * This remote handle isn't enqueued, so we never received or processed
2116 * this request. Clear MSG_RESENT, because it can be handled like any
2117 * normal request now.
2119 lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2121 DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2125 static int mdt_intent_getattr(enum mdt_it_code opcode,
2126 struct mdt_thread_info *info,
2127 struct ldlm_lock **lockp,
2130 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2131 struct ldlm_lock *new_lock = NULL;
2133 struct ldlm_reply *ldlm_rep;
2134 struct ptlrpc_request *req;
2135 struct mdt_body *reqbody;
2142 child_bits = MDS_INODELOCK_LOOKUP;
2144 case MDT_IT_GETATTR:
2145 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2148 CERROR("Unhandled till now");
2152 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
2153 if (reqbody == NULL)
2156 rc = mdt_init_ucred(info, reqbody);
2160 req = info->mti_pill.rc_req;
2161 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2162 mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
2164 /* Get lock from request for possible resent case. */
2165 mdt_fixup_resent(&info->mti_pill, *lockp, &new_lock, lhc);
2167 ldlm_rep->lock_policy_res2 =
2168 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
2169 mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1, 1, 0);
2171 if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
2172 ldlm_rep->lock_policy_res2 = 0;
2173 if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
2174 ldlm_rep->lock_policy_res2) {
2175 GOTO(out, rc = ELDLM_LOCK_ABORTED);
2178 rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
2181 mdt_exit_ucred(info);
2185 static int mdt_intent_reint(enum mdt_it_code opcode,
2186 struct mdt_thread_info *info,
2187 struct ldlm_lock **lockp,
2190 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2191 struct ldlm_reply *rep;
2195 static const struct req_format *intent_fmts[REINT_MAX] = {
2196 [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
2197 [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN
2202 opc = mdt_reint_opcode(info, intent_fmts);
2206 if (mdt_it_flavor[opcode].it_reint != opc) {
2207 CERROR("Reint code %ld doesn't match intent: %d\n",
2212 /* Get lock from request for possible resent case. */
2213 mdt_fixup_resent(&info->mti_pill, *lockp, NULL, lhc);
2215 rc = mdt_reint_internal(info, lhc, opc);
2217 rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2221 /* MDC expects this in any case */
2223 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
2225 rep->lock_policy_res2 = rc;
2227 /* cross-ref case, the lock should be returned to the client */
2228 if (rc == -EREMOTE) {
2229 LASSERT(lustre_handle_is_used(&lhc->mlh_lh));
2230 rep->lock_policy_res2 = 0;
2231 RETURN(mdt_intent_lock_replace(info, lockp, NULL, lhc, flags));
2233 rep->lock_policy_res2 = rc;
2235 RETURN(ELDLM_LOCK_ABORTED);
2238 static int mdt_intent_code(long itcode)
2246 case IT_OPEN|IT_CREAT:
2253 rc = MDT_IT_READDIR;
2256 rc = MDT_IT_GETATTR;
2268 rc = MDT_IT_GETXATTR;
2271 CERROR("Unknown intent opcode: %ld\n", itcode);
2278 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
2279 struct ldlm_lock **lockp, int flags)
2281 struct req_capsule *pill;
2282 struct mdt_it_flavor *flv;
2287 opc = mdt_intent_code(itopc);
2291 pill = &info->mti_pill;
2292 flv = &mdt_it_flavor[opc];
2294 if (flv->it_fmt != NULL)
2295 req_capsule_extend(pill, flv->it_fmt);
2297 rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
2299 struct ptlrpc_request *req = mdt_info_req(info);
2300 if (flv->it_flags & MUTABOR &&
2301 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
2304 if (rc == 0 && flv->it_act != NULL) {
2305 /* execute policy */
2306 rc = flv->it_act(opc, info, lockp, flags);
2312 static int mdt_intent_policy(struct ldlm_namespace *ns,
2313 struct ldlm_lock **lockp, void *req_cookie,
2314 ldlm_mode_t mode, int flags, void *data)
2316 struct mdt_thread_info *info;
2317 struct ptlrpc_request *req = req_cookie;
2318 struct ldlm_intent *it;
2319 struct req_capsule *pill;
2320 struct ldlm_lock *lock = *lockp;
2325 LASSERT(req != NULL);
2327 info = lu_context_key_get(&req->rq_svc_thread->t_env->le_ctx,
2329 LASSERT(info != NULL);
2330 pill = &info->mti_pill;
2331 LASSERT(pill->rc_req == req);
2333 if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
2334 req_capsule_extend(pill, &RQF_LDLM_INTENT);
2335 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
2337 LDLM_DEBUG(lock, "intent policy opc: %s\n",
2338 ldlm_it2str(it->opc));
2340 rc = mdt_intent_opc(it->opc, info, lockp, flags);
2346 /* No intent was provided */
2347 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
2348 rc = req_capsule_pack(pill);
2356 static int mdt_seq_fini(const struct lu_env *env,
2357 struct mdt_device *m)
2359 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2362 if (ls && ls->ls_server_seq) {
2363 seq_server_fini(ls->ls_server_seq, env);
2364 OBD_FREE_PTR(ls->ls_server_seq);
2365 ls->ls_server_seq = NULL;
2368 if (ls && ls->ls_control_seq) {
2369 seq_server_fini(ls->ls_control_seq, env);
2370 OBD_FREE_PTR(ls->ls_control_seq);
2371 ls->ls_control_seq = NULL;
2374 if (ls && ls->ls_client_seq) {
2375 seq_client_fini(ls->ls_client_seq);
2376 OBD_FREE_PTR(ls->ls_client_seq);
2377 ls->ls_client_seq = NULL;
2383 static int mdt_seq_init(const struct lu_env *env,
2385 struct mdt_device *m)
2392 ls = m->mdt_md_dev.md_lu_dev.ld_site;
2395 * This is sequence-controller node. Init seq-controller server on local
2398 if (ls->ls_node_id == 0) {
2399 LASSERT(ls->ls_control_seq == NULL);
2401 OBD_ALLOC_PTR(ls->ls_control_seq);
2402 if (ls->ls_control_seq == NULL)
2405 rc = seq_server_init(ls->ls_control_seq,
2406 m->mdt_bottom, uuid,
2407 LUSTRE_SEQ_CONTROLLER,
2411 GOTO(out_seq_fini, rc);
2413 OBD_ALLOC_PTR(ls->ls_client_seq);
2414 if (ls->ls_client_seq == NULL)
2415 GOTO(out_seq_fini, rc = -ENOMEM);
2417 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2418 if (prefix == NULL) {
2419 OBD_FREE_PTR(ls->ls_client_seq);
2420 GOTO(out_seq_fini, rc = -ENOMEM);
2423 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2427 * Init seq-controller client after seq-controller server is
2428 * ready. Pass ls->ls_control_seq to it for direct talking.
2430 rc = seq_client_init(ls->ls_client_seq, NULL,
2431 LUSTRE_SEQ_METADATA, prefix,
2432 ls->ls_control_seq);
2433 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2436 GOTO(out_seq_fini, rc);
2439 /* Init seq-server on local MDT */
2440 LASSERT(ls->ls_server_seq == NULL);
2442 OBD_ALLOC_PTR(ls->ls_server_seq);
2443 if (ls->ls_server_seq == NULL)
2444 GOTO(out_seq_fini, rc = -ENOMEM);
2446 rc = seq_server_init(ls->ls_server_seq,
2447 m->mdt_bottom, uuid,
2451 GOTO(out_seq_fini, rc = -ENOMEM);
2453 /* Assign seq-controller client to local seq-server. */
2454 if (ls->ls_node_id == 0) {
2455 LASSERT(ls->ls_client_seq != NULL);
2457 rc = seq_server_set_cli(ls->ls_server_seq,
2465 mdt_seq_fini(env, m);
2470 static int mdt_md_connect(const struct lu_env *env,
2471 struct lustre_handle *conn,
2472 struct obd_device *mdc)
2474 struct obd_connect_data *ocd;
2480 /* The connection between MDS must be local */
2481 ocd->ocd_connect_flags |= OBD_CONNECT_LCL_CLIENT;
2482 rc = obd_connect(env, conn, mdc, &mdc->obd_uuid, ocd);
2489 * Init client sequence manager which is used by local MDS to talk to sequence
2490 * controller on remote node.
2492 static int mdt_seq_init_cli(const struct lu_env *env,
2493 struct mdt_device *m,
2494 struct lustre_cfg *cfg)
2496 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2497 struct obd_device *mdc;
2498 struct obd_uuid *uuidp, *mdcuuidp;
2499 char *uuid_str, *mdc_uuid_str;
2502 struct mdt_thread_info *info;
2503 char *p, *index_string = lustre_cfg_string(cfg, 2);
2506 info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2507 uuidp = &info->mti_u.uuid[0];
2508 mdcuuidp = &info->mti_u.uuid[1];
2510 LASSERT(index_string);
2512 index = simple_strtol(index_string, &p, 10);
2514 CERROR("Invalid index in lustre_cgf, offset 2\n");
2518 /* check if this is adding the first MDC and controller is not yet
2520 if (index != 0 || ls->ls_client_seq)
2523 uuid_str = lustre_cfg_string(cfg, 1);
2524 mdc_uuid_str = lustre_cfg_string(cfg, 4);
2525 obd_str2uuid(uuidp, uuid_str);
2526 obd_str2uuid(mdcuuidp, mdc_uuid_str);
2528 mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2530 CERROR("can't find controller MDC by uuid %s\n",
2533 } else if (!mdc->obd_set_up) {
2534 CERROR("target %s not set up\n", mdc->obd_name);
2537 struct lustre_handle conn = {0, };
2539 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
2540 mdc->obd_name, mdc->obd_uuid.uuid);
2542 rc = mdt_md_connect(env, &conn, mdc);
2544 CERROR("target %s connect error %d\n",
2547 ls->ls_control_exp = class_conn2export(&conn);
2549 OBD_ALLOC_PTR(ls->ls_client_seq);
2551 if (ls->ls_client_seq != NULL) {
2554 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2558 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2561 rc = seq_client_init(ls->ls_client_seq,
2563 LUSTRE_SEQ_METADATA,
2565 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2572 LASSERT(ls->ls_server_seq != NULL);
2574 rc = seq_server_set_cli(ls->ls_server_seq,
2583 static void mdt_seq_fini_cli(struct mdt_device *m)
2590 ls = m->mdt_md_dev.md_lu_dev.ld_site;
2592 if (ls && ls->ls_server_seq)
2593 seq_server_set_cli(ls->ls_server_seq,
2596 if (ls && ls->ls_control_exp) {
2597 rc = obd_disconnect(ls->ls_control_exp);
2599 CERROR("failure to disconnect "
2602 ls->ls_control_exp = NULL;
2610 static int mdt_fld_fini(const struct lu_env *env,
2611 struct mdt_device *m)
2613 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2616 if (ls && ls->ls_server_fld) {
2617 fld_server_fini(ls->ls_server_fld, env);
2618 OBD_FREE_PTR(ls->ls_server_fld);
2619 ls->ls_server_fld = NULL;
2622 if (ls && ls->ls_client_fld != NULL) {
2623 fld_client_fini(ls->ls_client_fld);
2624 OBD_FREE_PTR(ls->ls_client_fld);
2625 ls->ls_client_fld = NULL;
2631 static int mdt_fld_init(const struct lu_env *env,
2633 struct mdt_device *m)
2635 struct lu_fld_target target;
2640 ls = m->mdt_md_dev.md_lu_dev.ld_site;
2642 OBD_ALLOC_PTR(ls->ls_server_fld);
2643 if (ls->ls_server_fld == NULL)
2644 RETURN(rc = -ENOMEM);
2646 rc = fld_server_init(ls->ls_server_fld,
2647 m->mdt_bottom, uuid, env);
2649 OBD_FREE_PTR(ls->ls_server_fld);
2650 ls->ls_server_fld = NULL;
2653 OBD_ALLOC_PTR(ls->ls_client_fld);
2654 if (!ls->ls_client_fld)
2655 GOTO(out_fld_fini, rc = -ENOMEM);
2657 rc = fld_client_init(ls->ls_client_fld, uuid,
2658 LUSTRE_CLI_FLD_HASH_DHT);
2660 CERROR("can't init FLD, err %d\n", rc);
2661 OBD_FREE_PTR(ls->ls_client_fld);
2662 GOTO(out_fld_fini, rc);
2665 target.ft_srv = ls->ls_server_fld;
2666 target.ft_idx = ls->ls_node_id;
2667 target.ft_exp = NULL;
2669 fld_client_add_target(ls->ls_client_fld, &target);
2673 mdt_fld_fini(env, m);
2677 /* device init/fini methods */
2678 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2680 if (m->mdt_regular_service != NULL) {
2681 ptlrpc_unregister_service(m->mdt_regular_service);
2682 m->mdt_regular_service = NULL;
2684 if (m->mdt_readpage_service != NULL) {
2685 ptlrpc_unregister_service(m->mdt_readpage_service);
2686 m->mdt_readpage_service = NULL;
2688 if (m->mdt_setattr_service != NULL) {
2689 ptlrpc_unregister_service(m->mdt_setattr_service);
2690 m->mdt_setattr_service = NULL;
2692 if (m->mdt_mdsc_service != NULL) {
2693 ptlrpc_unregister_service(m->mdt_mdsc_service);
2694 m->mdt_mdsc_service = NULL;
2696 if (m->mdt_mdss_service != NULL) {
2697 ptlrpc_unregister_service(m->mdt_mdss_service);
2698 m->mdt_mdss_service = NULL;
2700 if (m->mdt_dtss_service != NULL) {
2701 ptlrpc_unregister_service(m->mdt_dtss_service);
2702 m->mdt_dtss_service = NULL;
2704 if (m->mdt_fld_service != NULL) {
2705 ptlrpc_unregister_service(m->mdt_fld_service);
2706 m->mdt_fld_service = NULL;
2710 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2713 static struct ptlrpc_service_conf conf;
2716 conf = (typeof(conf)) {
2717 .psc_nbufs = MDS_NBUFS,
2718 .psc_bufsize = MDS_BUFSIZE,
2719 .psc_max_req_size = MDS_MAXREQSIZE,
2720 .psc_max_reply_size = MDS_MAXREPSIZE,
2721 .psc_req_portal = MDS_REQUEST_PORTAL,
2722 .psc_rep_portal = MDC_REPLY_PORTAL,
2723 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2725 * We'd like to have a mechanism to set this on a per-device
2726 * basis, but alas...
2728 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2730 .psc_ctx_tags = LCT_MD_THREAD
2733 m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2734 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2735 "mdt_ldlm_client", m->mdt_ldlm_client);
2737 m->mdt_regular_service =
2738 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
2739 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2741 if (m->mdt_regular_service == NULL)
2744 rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
2746 GOTO(err_mdt_svc, rc);
2749 * readpage service configuration. Parameters have to be adjusted,
2752 conf = (typeof(conf)) {
2753 .psc_nbufs = MDS_NBUFS,
2754 .psc_bufsize = MDS_BUFSIZE,
2755 .psc_max_req_size = MDS_MAXREQSIZE,
2756 .psc_max_reply_size = MDS_MAXREPSIZE,
2757 .psc_req_portal = MDS_READPAGE_PORTAL,
2758 .psc_rep_portal = MDC_REPLY_PORTAL,
2759 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2760 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2762 .psc_ctx_tags = LCT_MD_THREAD
2764 m->mdt_readpage_service =
2765 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2766 LUSTRE_MDT_NAME "_readpage",
2767 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2770 if (m->mdt_readpage_service == NULL) {
2771 CERROR("failed to start readpage service\n");
2772 GOTO(err_mdt_svc, rc = -ENOMEM);
2775 rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2778 * setattr service configuration.
2780 conf = (typeof(conf)) {
2781 .psc_nbufs = MDS_NBUFS,
2782 .psc_bufsize = MDS_BUFSIZE,
2783 .psc_max_req_size = MDS_MAXREQSIZE,
2784 .psc_max_reply_size = MDS_MAXREPSIZE,
2785 .psc_req_portal = MDS_SETATTR_PORTAL,
2786 .psc_rep_portal = MDC_REPLY_PORTAL,
2787 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2788 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2790 .psc_ctx_tags = LCT_MD_THREAD
2793 m->mdt_setattr_service =
2794 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2795 LUSTRE_MDT_NAME "_setattr",
2796 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2799 if (!m->mdt_setattr_service) {
2800 CERROR("failed to start setattr service\n");
2801 GOTO(err_mdt_svc, rc = -ENOMEM);
2804 rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2806 GOTO(err_mdt_svc, rc);
2809 * sequence controller service configuration
2811 conf = (typeof(conf)) {
2812 .psc_nbufs = MDS_NBUFS,
2813 .psc_bufsize = MDS_BUFSIZE,
2814 .psc_max_req_size = SEQ_MAXREQSIZE,
2815 .psc_max_reply_size = SEQ_MAXREPSIZE,
2816 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
2817 .psc_rep_portal = MDC_REPLY_PORTAL,
2818 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2819 .psc_num_threads = SEQ_NUM_THREADS,
2820 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2823 m->mdt_mdsc_service =
2824 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
2825 LUSTRE_MDT_NAME"_mdsc",
2826 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2828 if (!m->mdt_mdsc_service) {
2829 CERROR("failed to start seq controller service\n");
2830 GOTO(err_mdt_svc, rc = -ENOMEM);
2833 rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
2835 GOTO(err_mdt_svc, rc);
2838 * metadata sequence server service configuration
2840 conf = (typeof(conf)) {
2841 .psc_nbufs = MDS_NBUFS,
2842 .psc_bufsize = MDS_BUFSIZE,
2843 .psc_max_req_size = SEQ_MAXREQSIZE,
2844 .psc_max_reply_size = SEQ_MAXREPSIZE,
2845 .psc_req_portal = SEQ_METADATA_PORTAL,
2846 .psc_rep_portal = MDC_REPLY_PORTAL,
2847 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2848 .psc_num_threads = SEQ_NUM_THREADS,
2849 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2852 m->mdt_mdss_service =
2853 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
2854 LUSTRE_MDT_NAME"_mdss",
2855 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2857 if (!m->mdt_mdss_service) {
2858 CERROR("failed to start metadata seq server service\n");
2859 GOTO(err_mdt_svc, rc = -ENOMEM);
2862 rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
2864 GOTO(err_mdt_svc, rc);
2868 * Data sequence server service configuration. We want to have really
2869 * cluster-wide sequences space. This is why we start only one sequence
2870 * controller which manages space.
2872 conf = (typeof(conf)) {
2873 .psc_nbufs = MDS_NBUFS,
2874 .psc_bufsize = MDS_BUFSIZE,
2875 .psc_max_req_size = SEQ_MAXREQSIZE,
2876 .psc_max_reply_size = SEQ_MAXREPSIZE,
2877 .psc_req_portal = SEQ_DATA_PORTAL,
2878 .psc_rep_portal = OSC_REPLY_PORTAL,
2879 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2880 .psc_num_threads = SEQ_NUM_THREADS,
2881 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2884 m->mdt_dtss_service =
2885 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
2886 LUSTRE_MDT_NAME"_dtss",
2887 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2889 if (!m->mdt_dtss_service) {
2890 CERROR("failed to start data seq server service\n");
2891 GOTO(err_mdt_svc, rc = -ENOMEM);
2894 rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
2896 GOTO(err_mdt_svc, rc);
2898 /* FLD service start */
2899 conf = (typeof(conf)) {
2900 .psc_nbufs = MDS_NBUFS,
2901 .psc_bufsize = MDS_BUFSIZE,
2902 .psc_max_req_size = FLD_MAXREQSIZE,
2903 .psc_max_reply_size = FLD_MAXREPSIZE,
2904 .psc_req_portal = FLD_REQUEST_PORTAL,
2905 .psc_rep_portal = MDC_REPLY_PORTAL,
2906 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2907 .psc_num_threads = FLD_NUM_THREADS,
2908 .psc_ctx_tags = LCT_DT_THREAD|LCT_MD_THREAD
2911 m->mdt_fld_service =
2912 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
2913 LUSTRE_MDT_NAME"_fld",
2914 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2916 if (!m->mdt_fld_service) {
2917 CERROR("failed to start fld service\n");
2918 GOTO(err_mdt_svc, rc = -ENOMEM);
2921 rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
2923 GOTO(err_mdt_svc, rc);
2928 mdt_stop_ptlrpc_service(m);
2933 static void mdt_stack_fini(const struct lu_env *env,
2934 struct mdt_device *m, struct lu_device *top)
2936 struct lu_device *d = top, *n;
2937 struct lustre_cfg_bufs *bufs;
2938 struct lustre_cfg *lcfg;
2939 struct mdt_thread_info *info;
2942 info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2943 LASSERT(info != NULL);
2945 bufs = &info->mti_u.bufs;
2946 /* process cleanup */
2947 lustre_cfg_bufs_reset(bufs, NULL);
2948 lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
2950 CERROR("Cannot alloc lcfg!\n");
2954 top->ld_ops->ldo_process_config(env, top, lcfg);
2955 lustre_cfg_free(lcfg);
2957 lu_site_purge(env, top->ld_site, ~0);
2959 struct obd_type *type;
2960 struct lu_device_type *ldt = d->ld_type;
2962 /* each fini() returns next device in stack of layers
2963 * * so we can avoid the recursion */
2964 n = ldt->ldt_ops->ldto_device_fini(env, d);
2966 ldt->ldt_ops->ldto_device_free(env, d);
2967 type = ldt->ldt_obd_type;
2969 class_put_type(type);
2971 /* switch to the next device in the layer */
2974 m->mdt_child = NULL;
2977 static struct lu_device *mdt_layer_setup(const struct lu_env *env,
2978 const char *typename,
2979 struct lu_device *child,
2980 struct lustre_cfg *cfg)
2982 struct obd_type *type;
2983 struct lu_device_type *ldt;
2984 struct lu_device *d;
2989 type = class_get_type(typename);
2991 CERROR("Unknown type: '%s'\n", typename);
2992 GOTO(out, rc = -ENODEV);
2995 rc = lu_context_refill(&env->le_ctx);
2997 CERROR("Failure to refill context: '%d'\n", rc);
3001 if (env->le_ses != NULL) {
3002 rc = lu_context_refill(env->le_ses);
3004 CERROR("Failure to refill session: '%d'\n", rc);
3011 CERROR("type: '%s'\n", typename);
3012 GOTO(out_type, rc = -EINVAL);
3015 ldt->ldt_obd_type = type;
3016 d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
3018 CERROR("Cannot allocate device: '%s'\n", typename);
3019 GOTO(out_type, rc = -ENODEV);
3022 LASSERT(child->ld_site);
3023 d->ld_site = child->ld_site;
3026 rc = ldt->ldt_ops->ldto_device_init(env, d, child);
3028 CERROR("can't init device '%s', rc %d\n", typename, rc);
3029 GOTO(out_alloc, rc);
3036 ldt->ldt_ops->ldto_device_free(env, d);
3039 class_put_type(type);
3044 static int mdt_stack_init(const struct lu_env *env,
3045 struct mdt_device *m, struct lustre_cfg *cfg)
3047 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
3048 struct lu_device *tmp;
3049 struct md_device *md;
3053 /* init the stack */
3054 tmp = mdt_layer_setup(env, LUSTRE_OSD_NAME, d, cfg);
3056 RETURN(PTR_ERR(tmp));
3058 m->mdt_bottom = lu2dt_dev(tmp);
3060 tmp = mdt_layer_setup(env, LUSTRE_MDD_NAME, d, cfg);
3062 GOTO(out, rc = PTR_ERR(tmp));
3067 tmp = mdt_layer_setup(env, LUSTRE_CMM_NAME, d, cfg);
3069 GOTO(out, rc = PTR_ERR(tmp));
3072 /*set mdd upcall device*/
3073 md->md_upcall.mu_upcall_dev = lu2md_dev(d);
3076 /*set cmm upcall device*/
3077 md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
3079 m->mdt_child = lu2md_dev(d);
3081 /* process setup config */
3082 tmp = &m->mdt_md_dev.md_lu_dev;
3083 rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
3086 /* fini from last known good lu_device */
3088 mdt_stack_fini(env, m, d);
3093 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
3095 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
3096 struct lu_site *ls = d->ld_site;
3099 target_cleanup_recovery(m->mdt_md_dev.md_lu_dev.ld_obd);
3101 ping_evictor_stop();
3102 mdt_stop_ptlrpc_service(m);
3104 upcall_cache_cleanup(m->mdt_rmtacl_cache);
3105 m->mdt_rmtacl_cache = NULL;
3107 upcall_cache_cleanup(m->mdt_identity_cache);
3108 m->mdt_identity_cache = NULL;
3110 if (m->mdt_namespace != NULL) {
3111 ldlm_namespace_free(m->mdt_namespace, 0);
3112 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
3115 mdt_seq_fini(env, m);
3116 mdt_seq_fini_cli(m);
3117 mdt_fld_fini(env, m);
3119 if (m->mdt_rootsquash_info) {
3120 OBD_FREE_PTR(m->mdt_rootsquash_info);
3121 m->mdt_rootsquash_info = NULL;
3124 cleanup_capas(CAPA_SITE_SERVER);
3125 del_timer(&m->mdt_ck_timer);
3126 mdt_ck_thread_stop(m);
3128 mdt_fs_cleanup(env, m);
3130 /* finish the stack */
3131 mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3138 LASSERT(atomic_read(&d->ld_ref) == 0);
3139 md_device_fini(&m->mdt_md_dev);
3144 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
3146 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
3147 struct lu_device_type *ldt, struct lustre_cfg *cfg)
3149 struct lprocfs_static_vars lvars;
3150 struct mdt_thread_info *info;
3151 struct obd_device *obd;
3152 const char *dev = lustre_cfg_string(cfg, 0);
3153 const char *num = lustre_cfg_string(cfg, 2);
3158 info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3159 LASSERT(info != NULL);
3161 obd = class_name2obd(dev);
3164 spin_lock_init(&m->mdt_transno_lock);
3166 m->mdt_max_mdsize = MAX_MD_SIZE;
3167 m->mdt_max_cookiesize = sizeof(struct llog_cookie);
3169 spin_lock_init(&m->mdt_ioepoch_lock);
3170 /* Temporary. should parse mount option. */
3171 m->mdt_opts.mo_user_xattr = 0;
3172 m->mdt_opts.mo_acl = 0;
3173 m->mdt_opts.mo_compat_resname = 0;
3174 m->mdt_opts.mo_mds_capa = 0;
3175 m->mdt_opts.mo_oss_capa = 0;
3176 m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
3177 m->mdt_capa_timeout = CAPA_TIMEOUT;
3178 m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
3179 obd->obd_replayable = 1;
3180 spin_lock_init(&m->mdt_client_bitmap_lock);
3186 md_device_init(&m->mdt_md_dev, ldt);
3187 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
3188 m->mdt_md_dev.md_lu_dev.ld_obd = obd;
3189 /* set this lu_device to obd, because error handling need it */
3190 obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
3192 rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
3194 CERROR("can't init lu_site, rc %d\n", rc);
3195 GOTO(err_free_site, rc);
3198 lprocfs_init_vars(mdt, &lvars);
3199 rc = lprocfs_obd_setup(obd, lvars.obd_vars);
3201 CERROR("can't init lprocfs, rc %d\n", rc);
3202 GOTO(err_fini_site, rc);
3205 /* init the stack */
3206 rc = mdt_stack_init(env, m, cfg);
3208 CERROR("can't init device stack, rc %d\n", rc);
3209 GOTO(err_fini_site, rc);
3212 /* set server index */
3214 s->ls_node_id = simple_strtol(num, NULL, 10);
3216 rc = mdt_fld_init(env, obd->obd_name, m);
3218 GOTO(err_fini_stack, rc);
3220 rc = mdt_seq_init(env, obd->obd_name, m);
3222 GOTO(err_fini_fld, rc);
3224 snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
3225 LUSTRE_MDT_NAME"-%p", m);
3226 m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
3227 LDLM_NAMESPACE_SERVER);
3228 if (m->mdt_namespace == NULL)
3229 GOTO(err_fini_seq, rc = -ENOMEM);
3231 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
3232 /* set obd_namespace for compatibility with old code */
3233 obd->obd_namespace = m->mdt_namespace;
3235 m->mdt_identity_cache = upcall_cache_init(obd->obd_name,
3236 MDT_IDENTITY_UPCALL_PATH,
3237 &mdt_identity_upcall_cache_ops);
3238 if (IS_ERR(m->mdt_identity_cache)) {
3239 rc = PTR_ERR(m->mdt_identity_cache);
3240 m->mdt_identity_cache = NULL;
3241 GOTO(err_free_ns, rc);
3244 m->mdt_rmtacl_cache = upcall_cache_init(obd->obd_name,
3245 MDT_RMTACL_UPCALL_PATH,
3246 &mdt_rmtacl_upcall_cache_ops);
3247 if (IS_ERR(m->mdt_rmtacl_cache)) {
3248 rc = PTR_ERR(m->mdt_rmtacl_cache);
3249 m->mdt_rmtacl_cache = NULL;
3250 GOTO(err_free_ns, rc);
3253 rc = mdt_ck_thread_start(m);
3255 GOTO(err_free_ns, rc);
3256 m->mdt_ck_timer.function = mdt_ck_timer_callback;
3257 m->mdt_ck_timer.data = (unsigned long)m;
3258 init_timer(&m->mdt_ck_timer);
3260 s->ls_capa_keys = m->mdt_capa_keys;
3261 s->ls_capa_timeout = m->mdt_capa_timeout;
3262 s->ls_capa_alg = m->mdt_capa_alg;
3264 rc = mdt_start_ptlrpc_service(m);
3268 ping_evictor_start();
3269 rc = mdt_fs_setup(env, m, obd);
3271 GOTO(err_stop_service, rc);
3273 if(obd->obd_recovering == 0)
3274 mdt_postrecov(env, m);
3276 m->mdt_opts.mo_no_gss_support = 1;
3281 mdt_stop_ptlrpc_service(m);
3283 del_timer(&m->mdt_ck_timer);
3284 mdt_ck_thread_stop(m);
3286 upcall_cache_cleanup(m->mdt_rmtacl_cache);
3287 m->mdt_rmtacl_cache = NULL;
3288 upcall_cache_cleanup(m->mdt_identity_cache);
3289 m->mdt_identity_cache = NULL;
3290 ldlm_namespace_free(m->mdt_namespace, 0);
3291 obd->obd_namespace = m->mdt_namespace = NULL;
3293 mdt_seq_fini(env, m);
3295 mdt_fld_fini(env, m);
3297 mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3303 md_device_fini(&m->mdt_md_dev);
3307 /* FIXME: this macro is copied from lnet/libcfs/nidstring.c */
3308 #define LNET_NIDSTR_SIZE 32 /* size of each one (see below for usage) */
3309 static void do_process_nosquash_nids(struct mdt_device *m, char *buf)
3311 struct rootsquash_info *rsi = m->mdt_rootsquash_info;
3312 char str[LNET_NIDSTR_SIZE], *end;
3316 rsi->rsi_n_nosquash_nids = 0;
3317 while (rsi->rsi_n_nosquash_nids < N_NOSQUASH_NIDS) {
3318 end = strchr(buf, ',');
3319 memset(str, 0, sizeof(str));
3321 strncpy(str, buf, min_t(int, sizeof(str), end - buf));
3323 strncpy(str, buf, min_t(int, sizeof(str), strlen(buf)));
3325 if (!strcmp(str, "*")) {
3328 nid = libcfs_str2nid(str);
3329 if (nid == LNET_NID_ANY)
3332 rsi->rsi_nosquash_nids[rsi->rsi_n_nosquash_nids++] = nid;
3334 if (!end || (*(end + 1) == 0))
3340 /* used by MGS to process specific configurations */
3341 static int mdt_process_config(const struct lu_env *env,
3342 struct lu_device *d, struct lustre_cfg *cfg)
3344 struct mdt_device *m = mdt_dev(d);
3345 struct md_device *md_next = m->mdt_child;
3346 struct lu_device *next = md2lu_dev(md_next);
3350 switch (cfg->lcfg_command) {
3354 for (i = 1; i < cfg->lcfg_bufcount; i++) {
3357 key = lustre_cfg_buf(cfg, i);
3358 val = strchr(key, '=');
3359 if (!val || (*(val + 1) == 0)) {
3360 CERROR("Can't parse param %s\n", key);
3362 /* continue parsing other params */
3367 if (class_match_param(key,
3368 PARAM_GSS_SUPPORT, 0) == 0) {
3369 if (memcmp(val, "no", 2) == 0) {
3370 m->mdt_opts.mo_no_gss_support = 1;
3371 } else if (memcmp(val, "yes", 3) == 0) {
3372 m->mdt_opts.mo_no_gss_support = 0;
3374 CERROR("Can't parse param %s\n", key);
3376 /* continue parsing other params */
3379 } else if (class_match_param(key,
3380 PARAM_ROOTSQUASH_UID, 0) == 0) {
3381 if (!m->mdt_rootsquash_info)
3382 OBD_ALLOC_PTR(m->mdt_rootsquash_info);
3383 if (!m->mdt_rootsquash_info)
3386 m->mdt_rootsquash_info->rsi_uid =
3387 simple_strtoul(val, NULL, 0);
3388 } else if (class_match_param(key,
3389 PARAM_ROOTSQUASH_GID, 0) == 0) {
3390 if (!m->mdt_rootsquash_info)
3391 OBD_ALLOC_PTR(m->mdt_rootsquash_info);
3392 if (!m->mdt_rootsquash_info)
3395 m->mdt_rootsquash_info->rsi_gid =
3396 simple_strtoul(val, NULL, 0);
3397 } else if (class_match_param(key,
3398 PARAM_ROOTSQUASH_SKIPS, 0) == 0) {
3399 if (!m->mdt_rootsquash_info)
3400 OBD_ALLOC_PTR(m->mdt_rootsquash_info);
3401 if (!m->mdt_rootsquash_info)
3404 do_process_nosquash_nids(m, val);
3411 /* others are passed further */
3412 rc = next->ld_ops->ldo_process_config(env, next, cfg);
3417 * Add mdc hook to get first MDT uuid and connect it to
3418 * ls->controller to use for seq manager.
3420 rc = mdt_seq_init_cli(env, mdt_dev(d), cfg);
3422 CERROR("can't initialize controller export, "
3426 /* others are passed further */
3427 rc = next->ld_ops->ldo_process_config(env, next, cfg);
3433 static struct lu_object *mdt_object_alloc(const struct lu_env *env,
3434 const struct lu_object_header *hdr,
3435 struct lu_device *d)
3437 struct mdt_object *mo;
3443 struct lu_object *o;
3444 struct lu_object_header *h;
3446 o = &mo->mot_obj.mo_lu;
3447 h = &mo->mot_header;
3448 lu_object_header_init(h);
3449 lu_object_init(o, h, d);
3450 lu_object_add_top(h, o);
3451 o->lo_ops = &mdt_obj_ops;
3457 static int mdt_object_init(const struct lu_env *env, struct lu_object *o)
3459 struct mdt_device *d = mdt_dev(o->lo_dev);
3460 struct lu_device *under;
3461 struct lu_object *below;
3465 CDEBUG(D_INFO, "object init, fid = "DFID"\n",
3466 PFID(lu_object_fid(o)));
3468 under = &d->mdt_child->md_lu_dev;
3469 below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
3470 if (below != NULL) {
3471 lu_object_add(o, below);
3477 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
3479 struct mdt_object *mo = mdt_obj(o);
3480 struct lu_object_header *h;
3484 CDEBUG(D_INFO, "object free, fid = "DFID"\n",
3485 PFID(lu_object_fid(o)));
3488 lu_object_header_fini(h);
3493 static int mdt_object_print(const struct lu_env *env, void *cookie,
3494 lu_printer_t p, const struct lu_object *o)
3496 return (*p)(env, cookie, LUSTRE_MDT_NAME"-object@%p", o);
3499 static struct lu_device_operations mdt_lu_ops = {
3500 .ldo_object_alloc = mdt_object_alloc,
3501 .ldo_process_config = mdt_process_config
3504 static struct lu_object_operations mdt_obj_ops = {
3505 .loo_object_init = mdt_object_init,
3506 .loo_object_free = mdt_object_free,
3507 .loo_object_print = mdt_object_print
3510 /* mds_connect_internal */
3511 static int mdt_connect_internal(struct obd_export *exp,
3512 struct mdt_device *mdt,
3513 struct obd_connect_data *data)
3518 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
3519 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
3521 /* If no known bits (which should not happen, probably,
3522 as everybody should support LOOKUP and UPDATE bits at least)
3523 revert to compat mode with plain locks. */
3524 if (!data->ocd_ibits_known &&
3525 data->ocd_connect_flags & OBD_CONNECT_IBITS)
3526 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
3528 if (!mdt->mdt_opts.mo_acl)
3529 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
3531 if (!mdt->mdt_opts.mo_user_xattr)
3532 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
3534 if (!mdt->mdt_opts.mo_mds_capa)
3535 data->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
3537 if (!mdt->mdt_opts.mo_oss_capa)
3538 data->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
3540 exp->exp_connect_flags = data->ocd_connect_flags;
3541 data->ocd_version = LUSTRE_VERSION_CODE;
3542 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
3545 if (mdt->mdt_opts.mo_acl &&
3546 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
3547 CWARN("%s: MDS requires ACL support but client does not\n",
3548 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3552 flags = OBD_CONNECT_LCL_CLIENT | OBD_CONNECT_RMT_CLIENT;
3553 if ((exp->exp_connect_flags & flags) == flags) {
3554 CWARN("%s: both local and remote client flags are set\n",
3555 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3559 if (mdt->mdt_opts.mo_mds_capa &&
3560 ((exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) == 0)) {
3561 CWARN("%s: MDS requires capability support, but client not\n",
3562 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3566 if (mdt->mdt_opts.mo_oss_capa &&
3567 ((exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA) == 0)) {
3568 CWARN("%s: MDS requires OSS capability support, "
3570 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3577 /* mds_connect copy */
3578 static int mdt_obd_connect(const struct lu_env *env,
3579 struct lustre_handle *conn, struct obd_device *obd,
3580 struct obd_uuid *cluuid,
3581 struct obd_connect_data *data)
3583 struct mdt_export_data *med;
3584 struct mdt_client_data *mcd;
3585 struct obd_export *exp;
3586 struct mdt_device *mdt;
3590 LASSERT(env != NULL);
3591 if (!conn || !obd || !cluuid)
3594 mdt = mdt_dev(obd->obd_lu_dev);
3596 rc = class_connect(conn, obd, cluuid);
3600 exp = class_conn2export(conn);
3601 LASSERT(exp != NULL);
3602 med = &exp->exp_mdt_data;
3604 rc = mdt_connect_internal(exp, mdt, data);
3608 memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
3610 rc = mdt_client_new(env, mdt, med);
3613 med->med_mcd = NULL;
3620 class_disconnect(exp);
3622 class_export_put(exp);
3627 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
3628 struct obd_uuid *cluuid,
3629 struct obd_connect_data *data)
3634 if (exp == NULL || obd == NULL || cluuid == NULL)
3637 rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
3642 static int mdt_obd_disconnect(struct obd_export *exp)
3644 struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
3649 class_export_get(exp);
3651 /* Disconnect early so that clients can't keep using export */
3652 rc = class_disconnect(exp);
3653 if (mdt->mdt_namespace != NULL || exp->exp_obd->obd_namespace != NULL)
3654 ldlm_cancel_locks_for_export(exp);
3656 /* complete all outstanding replies */
3657 spin_lock(&exp->exp_lock);
3658 while (!list_empty(&exp->exp_outstanding_replies)) {
3659 struct ptlrpc_reply_state *rs =
3660 list_entry(exp->exp_outstanding_replies.next,
3661 struct ptlrpc_reply_state, rs_exp_list);
3662 struct ptlrpc_service *svc = rs->rs_service;
3664 spin_lock(&svc->srv_lock);
3665 list_del_init(&rs->rs_exp_list);
3666 ptlrpc_schedule_difficult_reply(rs);
3667 spin_unlock(&svc->srv_lock);
3669 spin_unlock(&exp->exp_lock);
3671 class_export_put(exp);
3675 /* FIXME: Can we avoid using these two interfaces? */
3676 static int mdt_init_export(struct obd_export *exp)
3678 struct mdt_export_data *med = &exp->exp_mdt_data;
3681 INIT_LIST_HEAD(&med->med_open_head);
3682 spin_lock_init(&med->med_open_lock);
3683 exp->exp_connecting = 1;
3687 static int mdt_destroy_export(struct obd_export *export)
3689 struct mdt_export_data *med;
3690 struct obd_device *obd = export->exp_obd;
3691 struct mdt_device *mdt;
3692 struct mdt_thread_info *info;
3698 med = &export->exp_mdt_data;
3699 if (med->med_rmtclient)
3700 mdt_cleanup_idmap(med);
3702 target_destroy_export(export);
3704 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
3707 mdt = mdt_dev(obd->obd_lu_dev);
3708 LASSERT(mdt != NULL);
3710 rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3714 info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
3715 LASSERT(info != NULL);
3716 memset(info, 0, sizeof *info);
3717 info->mti_env = &env;
3718 info->mti_mdt = mdt;
3720 ma = &info->mti_attr;
3721 ma->ma_lmm_size = mdt->mdt_max_mdsize;
3722 ma->ma_cookie_size = mdt->mdt_max_cookiesize;
3723 OBD_ALLOC(ma->ma_lmm, mdt->mdt_max_mdsize);
3724 OBD_ALLOC(ma->ma_cookie, mdt->mdt_max_cookiesize);
3726 if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
3727 GOTO(out, rc = -ENOMEM);
3728 ma->ma_need = MA_LOV | MA_COOKIE;
3730 /* Close any open files (which may also cause orphan unlinking). */
3731 spin_lock(&med->med_open_lock);
3732 while (!list_empty(&med->med_open_head)) {
3733 struct list_head *tmp = med->med_open_head.next;
3734 struct mdt_file_data *mfd =
3735 list_entry(tmp, struct mdt_file_data, mfd_list);
3736 struct md_attr *ma = &info->mti_attr;
3738 /* Remove mfd handle so it can't be found again.
3739 * We are consuming the mfd_list reference here. */
3740 class_handle_unhash(&mfd->mfd_handle);
3741 list_del_init(&mfd->mfd_list);
3742 spin_unlock(&med->med_open_lock);
3743 mdt_mfd_close(info, mfd);
3744 /* TODO: if we close the unlinked file,
3745 * we need to remove it's objects from OST */
3746 memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
3747 spin_lock(&med->med_open_lock);
3749 spin_unlock(&med->med_open_lock);
3750 info->mti_mdt = NULL;
3751 mdt_client_del(&env, mdt, med);
3755 OBD_FREE(ma->ma_lmm, mdt->mdt_max_mdsize);
3757 OBD_FREE(ma->ma_cookie, mdt->mdt_max_cookiesize);
3763 static int mdt_upcall(const struct lu_env *env, struct md_device *md,
3764 enum md_upcall_event ev)
3766 struct mdt_device *m = mdt_dev(&md->md_lu_dev);
3767 struct md_device *next = m->mdt_child;
3768 struct mdt_thread_info *mti;
3774 rc = next->md_ops->mdo_maxsize_get(env, next,
3776 &m->mdt_max_cookiesize);
3777 CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
3778 m->mdt_max_mdsize, m->mdt_max_cookiesize);
3781 mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3782 mti->mti_no_need_trans = 1;
3783 CDEBUG(D_INFO, "disable mdt trans for this thread\n");
3786 CERROR("invalid event\n");
3793 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3794 void *karg, void *uarg)
3797 struct obd_device *obd= exp->exp_obd;
3798 struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3799 struct dt_device *dt = mdt->mdt_bottom;
3803 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
3804 rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3810 rc = dt->dd_ops->dt_sync(&env, dt);
3813 case OBD_IOC_SET_READONLY:
3814 rc = dt->dd_ops->dt_sync(&env, dt);
3815 dt->dd_ops->dt_ro(&env, dt);
3818 case OBD_IOC_ABORT_RECOVERY:
3819 CERROR("aborting recovery for device %s\n", obd->obd_name);
3820 target_abort_recovery(obd);
3824 CERROR("not supported cmd = %d for device %s\n",
3825 cmd, obd->obd_name);
3833 int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
3835 struct lu_device *ld = md2lu_dev(mdt->mdt_child);
3838 rc = ld->ld_ops->ldo_recovery_complete(env, ld);
3842 int mdt_obd_postrecov(struct obd_device *obd)
3847 rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3850 rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
3855 static struct obd_ops mdt_obd_device_ops = {
3856 .o_owner = THIS_MODULE,
3857 .o_connect = mdt_obd_connect,
3858 .o_reconnect = mdt_obd_reconnect,
3859 .o_disconnect = mdt_obd_disconnect,
3860 .o_init_export = mdt_init_export,
3861 .o_destroy_export = mdt_destroy_export,
3862 .o_iocontrol = mdt_iocontrol,
3863 .o_postrecov = mdt_obd_postrecov
3867 static struct lu_device* mdt_device_fini(const struct lu_env *env,
3868 struct lu_device *d)
3870 struct mdt_device *m = mdt_dev(d);
3876 static void mdt_device_free(const struct lu_env *env, struct lu_device *d)
3878 struct mdt_device *m = mdt_dev(d);
3883 static struct lu_device *mdt_device_alloc(const struct lu_env *env,
3884 struct lu_device_type *t,
3885 struct lustre_cfg *cfg)
3887 struct lu_device *l;
3888 struct mdt_device *m;
3894 l = &m->mdt_md_dev.md_lu_dev;
3895 rc = mdt_init0(env, m, t, cfg);
3901 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
3903 l = ERR_PTR(-ENOMEM);
3908 * context key constructor/destructor
3910 static void *mdt_key_init(const struct lu_context *ctx,
3911 struct lu_context_key *key)
3913 struct mdt_thread_info *info;
3916 * check that no high order allocations are incurred.
3918 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
3919 OBD_ALLOC_PTR(info);
3921 info = ERR_PTR(-ENOMEM);
3925 static void mdt_key_fini(const struct lu_context *ctx,
3926 struct lu_context_key *key, void *data)
3928 struct mdt_thread_info *info = data;
3932 struct lu_context_key mdt_thread_key = {
3933 .lct_tags = LCT_MD_THREAD,
3934 .lct_init = mdt_key_init,
3935 .lct_fini = mdt_key_fini
3938 static void *mdt_txn_key_init(const struct lu_context *ctx,
3939 struct lu_context_key *key)
3941 struct mdt_txn_info *txi;
3944 * check that no high order allocations are incurred.
3946 CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
3949 txi = ERR_PTR(-ENOMEM);
3953 static void mdt_txn_key_fini(const struct lu_context *ctx,
3954 struct lu_context_key *key, void *data)
3956 struct mdt_txn_info *txi = data;
3960 struct lu_context_key mdt_txn_key = {
3961 .lct_tags = LCT_TX_HANDLE,
3962 .lct_init = mdt_txn_key_init,
3963 .lct_fini = mdt_txn_key_fini
3966 struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
3968 return md_ucred(info->mti_env);
3971 static int mdt_type_init(struct lu_device_type *t)
3975 rc = lu_context_key_register(&mdt_thread_key);
3977 rc = lu_context_key_register(&mdt_txn_key);
3981 static void mdt_type_fini(struct lu_device_type *t)
3983 lu_context_key_degister(&mdt_thread_key);
3984 lu_context_key_degister(&mdt_txn_key);
3987 static struct lu_device_type_operations mdt_device_type_ops = {
3988 .ldto_init = mdt_type_init,
3989 .ldto_fini = mdt_type_fini,
3991 .ldto_device_alloc = mdt_device_alloc,
3992 .ldto_device_free = mdt_device_free,
3993 .ldto_device_fini = mdt_device_fini
3996 static struct lu_device_type mdt_device_type = {
3997 .ldt_tags = LU_DEVICE_MD,
3998 .ldt_name = LUSTRE_MDT_NAME,
3999 .ldt_ops = &mdt_device_type_ops,
4000 .ldt_ctx_tags = LCT_MD_THREAD
4003 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
4004 { "uuid", lprocfs_rd_uuid, 0, 0 },
4005 { "recovery_status", lprocfs_obd_rd_recovery_status, 0, 0 },
4006 { "num_exports", lprocfs_rd_num_exports, 0, 0 },
4010 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
4011 { "num_refs", lprocfs_rd_numrefs, 0, 0 },
4015 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
4017 static int __init mdt_mod_init(void)
4019 struct lprocfs_static_vars lvars;
4022 printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
4024 mdt_num_threads = MDT_NUM_THREADS;
4025 lprocfs_init_vars(mdt, &lvars);
4026 rc = class_register_type(&mdt_obd_device_ops, NULL,
4027 lvars.module_vars, LUSTRE_MDT_NAME,
4033 static void __exit mdt_mod_exit(void)
4035 class_unregister_type(LUSTRE_MDT_NAME);
4039 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt) \
4040 [prefix ## _ ## opc - prefix ## _ ## base] = { \
4042 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
4043 .mh_opc = prefix ## _ ## opc, \
4044 .mh_flags = flags, \
4049 #define DEF_MDT_HNDL(flags, name, fn, fmt) \
4050 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
4052 #define DEF_SEQ_HNDL(flags, name, fn, fmt) \
4053 DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
4055 #define DEF_FLD_HNDL(flags, name, fn, fmt) \
4056 DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
4058 * Request with a format known in advance
4060 #define DEF_MDT_HNDL_F(flags, name, fn) \
4061 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
4063 #define DEF_SEQ_HNDL_F(flags, name, fn) \
4064 DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
4066 #define DEF_FLD_HNDL_F(flags, name, fn) \
4067 DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_FLD_ ## name)
4069 * Request with a format we do not yet know
4071 #define DEF_MDT_HNDL_0(flags, name, fn) \
4072 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
4074 static struct mdt_handler mdt_mds_ops[] = {
4075 DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
4076 DEF_MDT_HNDL_F(0, DISCONNECT, mdt_disconnect),
4077 DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus),
4078 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR, mdt_getattr),
4079 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
4080 DEF_MDT_HNDL_F(HABEO_CORPUS|MUTABOR, SETXATTR, mdt_setxattr),
4081 DEF_MDT_HNDL_F(HABEO_CORPUS, GETXATTR, mdt_getxattr),
4082 DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs),
4083 DEF_MDT_HNDL_F(0 |MUTABOR,
4085 DEF_MDT_HNDL_F(HABEO_CORPUS , CLOSE, mdt_close),
4086 DEF_MDT_HNDL_F(HABEO_CORPUS , DONE_WRITING, mdt_done_writing),
4087 DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin),
4088 DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
4089 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR, mdt_is_subdir),
4090 DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle),
4091 DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle),
4092 DEF_MDT_HNDL_0(0 |HABEO_REFERO, RENEW_CAPA, mdt_renew_capa)
4095 #define DEF_OBD_HNDL(flags, name, fn) \
4096 DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
4099 static struct mdt_handler mdt_obd_ops[] = {
4100 DEF_OBD_HNDL(0, PING, mdt_obd_ping),
4101 DEF_OBD_HNDL(0, LOG_CANCEL, mdt_obd_log_cancel),
4102 DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback)
4105 #define DEF_DLM_HNDL_0(flags, name, fn) \
4106 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
4107 #define DEF_DLM_HNDL_F(flags, name, fn) \
4108 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
4110 static struct mdt_handler mdt_dlm_ops[] = {
4111 DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
4112 DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT, mdt_convert),
4113 DEF_DLM_HNDL_0(0, BL_CALLBACK, mdt_bl_callback),
4114 DEF_DLM_HNDL_0(0, CP_CALLBACK, mdt_cp_callback)
4117 static struct mdt_handler mdt_llog_ops[] = {
4120 #define DEF_SEC_CTX_HNDL(name, fn) \
4121 DEF_HNDL(SEC_CTX, INIT, _NET, 0, name, fn, NULL)
4123 static struct mdt_handler mdt_sec_ctx_ops[] = {
4124 DEF_SEC_CTX_HNDL(INIT, mdt_sec_ctx_handle),
4125 DEF_SEC_CTX_HNDL(INIT_CONT, mdt_sec_ctx_handle),
4126 DEF_SEC_CTX_HNDL(FINI, mdt_sec_ctx_handle)
4129 static struct mdt_opc_slice mdt_regular_handlers[] = {
4131 .mos_opc_start = MDS_GETATTR,
4132 .mos_opc_end = MDS_LAST_OPC,
4133 .mos_hs = mdt_mds_ops
4136 .mos_opc_start = OBD_PING,
4137 .mos_opc_end = OBD_LAST_OPC,
4138 .mos_hs = mdt_obd_ops
4141 .mos_opc_start = LDLM_ENQUEUE,
4142 .mos_opc_end = LDLM_LAST_OPC,
4143 .mos_hs = mdt_dlm_ops
4146 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
4147 .mos_opc_end = LLOG_LAST_OPC,
4148 .mos_hs = mdt_llog_ops
4151 .mos_opc_start = SEC_CTX_INIT,
4152 .mos_opc_end = SEC_LAST_OPC,
4153 .mos_hs = mdt_sec_ctx_ops
4160 static struct mdt_handler mdt_readpage_ops[] = {
4161 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
4162 #ifdef HAVE_SPLIT_SUPPORT
4163 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
4167 * XXX: this is ugly and should be fixed one day, see mdc_close() for
4168 * detailed comments. --umka
4170 DEF_MDT_HNDL_F(HABEO_CORPUS, CLOSE, mdt_close),
4171 DEF_MDT_HNDL_F(HABEO_CORPUS, DONE_WRITING, mdt_done_writing),
4174 static struct mdt_opc_slice mdt_readpage_handlers[] = {
4176 .mos_opc_start = MDS_GETATTR,
4177 .mos_opc_end = MDS_LAST_OPC,
4178 .mos_hs = mdt_readpage_ops
4185 static struct mdt_handler mdt_seq_ops[] = {
4186 DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
4189 static struct mdt_opc_slice mdt_seq_handlers[] = {
4191 .mos_opc_start = SEQ_QUERY,
4192 .mos_opc_end = SEQ_LAST_OPC,
4193 .mos_hs = mdt_seq_ops
4200 static struct mdt_handler mdt_fld_ops[] = {
4201 DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
4204 static struct mdt_opc_slice mdt_fld_handlers[] = {
4206 .mos_opc_start = FLD_QUERY,
4207 .mos_opc_end = FLD_LAST_OPC,
4208 .mos_hs = mdt_fld_ops
4215 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4216 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
4217 MODULE_LICENSE("GPL");
4219 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
4220 "number of mdt service threads to start");
4222 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);