1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/mdt/mdt_handler.c
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
13 * Author: Huang Hua <huanghua@clusterfs.com>
15 * This file is part of the Lustre file system, http://www.lustre.org
16 * Lustre is a trademark of Cluster File Systems, Inc.
18 * You may have signed or agreed to another license before downloading
19 * this software. If so, you are bound by the terms and conditions
20 * of that agreement, and the following does not apply to you. See the
21 * LICENSE file included with this distribution for more information.
23 * If you did not agree to a different license, then this copy of Lustre
24 * is open source software; you can redistribute it and/or modify it
25 * under the terms of version 2 of the GNU General Public License as
26 * published by the Free Software Foundation.
28 * In either case, Lustre is distributed in the hope that it will be
29 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * license text for more details.
35 # define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_MDS
39 #include <linux/module.h>
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
44 * struct OBD_{ALLOC,FREE}*()
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include "mdt_internal.h"
58 #include <linux/lustre_acl.h>
60 * Initialized in mdt_mod_init().
62 unsigned long mdt_num_threads;
64 /* ptlrpc request handler for MDT. All handlers are
65 * grouped into several slices - struct mdt_opc_slice,
66 * and stored in an array - mdt_handlers[].
69 /* The name of this handler. */
71 /* Fail id for this handler, checked at the beginning of this handler.*/
73 /* Operation code for this handler */
75 /* flags are listed in enum mdt_handler_flags below. */
77 /* The actual handler function to execute. */
78 int (*mh_act)(struct mdt_thread_info *info);
79 /* Request format for this request. */
80 const struct req_format *mh_fmt;
83 enum mdt_handler_flags {
85 * struct mdt_body is passed in the incoming message, and object
86 * identified by this fid exists on disk.
88 * "habeo corpus" == "I have a body"
90 HABEO_CORPUS = (1 << 0),
92 * struct ldlm_request is passed in the incoming message.
94 * "habeo clavis" == "I have a key"
96 HABEO_CLAVIS = (1 << 1),
98 * this request has fixed reply format, so that reply message can be
99 * packed by generic code.
101 * "habeo refero" == "I have a reply"
103 HABEO_REFERO = (1 << 2),
105 * this request will modify something, so check whether the filesystem
106 * is readonly or not, then return -EROFS to client asap if necessary.
108 * "mutabor" == "I shall modify"
113 struct mdt_opc_slice {
116 struct mdt_handler *mos_hs;
119 static struct mdt_opc_slice mdt_handlers[];
120 static struct mdt_opc_slice mdt_readpage_handlers[];
122 static int mdt_handle (struct ptlrpc_request *req);
123 static struct mdt_device *mdt_dev (struct lu_device *d);
124 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
126 static struct lu_object_operations mdt_obj_ops;
129 static int mdt_getstatus(struct mdt_thread_info *info)
131 struct md_device *next = info->mti_mdt->mdt_child;
133 struct mdt_body *body;
137 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
140 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
141 result = next->md_ops->mdo_root_get(info->mti_ctxt,
144 body->valid |= OBD_MD_FLID;
150 static int mdt_statfs(struct mdt_thread_info *info)
152 struct md_device *next = info->mti_mdt->mdt_child;
153 struct obd_statfs *osfs;
158 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
161 osfs = req_capsule_server_get(&info->mti_pill, &RMF_OBD_STATFS);
162 /* XXX max_age optimisation is needed here. See mds_statfs */
163 result = next->md_ops->mdo_statfs(info->mti_ctxt,
164 next, &info->mti_sfs);
165 statfs_pack(osfs, &info->mti_sfs);
171 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
172 const struct lu_fid *fid)
174 b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
175 OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
176 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
178 if (!S_ISREG(attr->la_mode))
179 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
182 b->atime = attr->la_atime;
183 b->mtime = attr->la_mtime;
184 b->ctime = attr->la_ctime;
185 b->mode = attr->la_mode;
186 b->size = attr->la_size;
187 b->blocks = attr->la_blocks;
188 b->uid = attr->la_uid;
189 b->gid = attr->la_gid;
190 b->flags = attr->la_flags;
191 b->nlink = attr->la_nlink;
195 b->valid |= OBD_MD_FLID;
196 CDEBUG(D_INODE, ""DFID3": nlink=%d, mode=%o, size="LPU64"\n",
197 PFID3(fid), b->nlink, b->mode, b->size);
201 static inline int mdt_body_has_lov(const struct lu_attr *la,
202 const struct mdt_body *body)
204 return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
205 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
208 static int mdt_getattr_internal(struct mdt_thread_info *info,
209 struct mdt_object *o, int offset)
211 struct md_object *next = mdt_object_child(o);
212 const struct mdt_body *reqbody = info->mti_body;
213 struct ptlrpc_request *req = mdt_info_req(info);
214 struct md_attr *ma = &info->mti_attr;
215 struct lu_attr *la = &ma->ma_attr;
216 struct req_capsule *pill = &info->mti_pill;
217 const struct lu_context *ctxt = info->mti_ctxt;
218 struct mdt_body *repbody;
224 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
227 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
228 repbody->eadatasize = 0;
229 repbody->aclsize = 0;
231 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
232 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
234 rc = mo_attr_get(ctxt, next, &info->mti_attr);
235 if (rc == -EREMOTE) {
236 /* This object is located on remote node.*/
237 repbody->fid1 = *mdt_object_fid(o);
238 repbody->valid |= OBD_MD_FLID;
239 GOTO(shrink, rc = 0);
241 CERROR("getattr error for "DFID3": %d\n",
242 PFID3(mdt_object_fid(o)), rc);
246 if (ma->ma_valid & MA_INODE)
247 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
249 if (mdt_body_has_lov(la, reqbody)) {
250 if (ma->ma_lmm_size && ma->ma_valid & MA_LOV) {
251 CDEBUG(D_INODE, "packing ea for "DFID3"\n",
252 PFID3(mdt_object_fid(o)));
253 mdt_dump_lmm(D_INFO, ma->ma_lmm);
254 repbody->eadatasize = ma->ma_lmm_size;
255 repbody->valid |= OBD_MD_FLEASIZE;
257 } else if (S_ISLNK(la->la_mode) &&
258 reqbody->valid & OBD_MD_LINKNAME) {
259 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
261 CERROR("readlink failed: %d\n", rc);
264 repbody->valid |= OBD_MD_LINKNAME;
265 repbody->eadatasize = rc + 1;
266 ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
267 CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
273 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
274 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
275 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
276 repbody->valid |= OBD_MD_FLMODEASIZE;
277 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & MAX_COOKIE"
280 repbody->max_cookiesize);
283 #ifdef CONFIG_FS_POSIX_ACL
284 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
285 (reqbody->valid & OBD_MD_FLACL)) {
286 buffer = req_capsule_server_get(pill, &RMF_ACL);
287 length = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
289 rc = mo_xattr_get(ctxt, next, buffer,
290 length, XATTR_NAME_ACL_ACCESS);
292 if (rc == -ENODATA || rc == -EOPNOTSUPP)
295 CERROR("got acl size: %d\n", rc);
297 repbody->aclsize = rc;
298 repbody->valid |= OBD_MD_FLACL;
305 /* FIXME: determine the offset of MDT_MD. but it does not work */
307 if (req_capsule_has_field(pill, &RMF_DLM_REP)) {
312 lustre_shrink_reply(req, offset, repbody->eadatasize, 1);
313 if (repbody->eadatasize)
315 lustre_shrink_reply(req, offset, repbody->aclsize, 0);
319 static int mdt_getattr(struct mdt_thread_info *info)
323 LASSERT(info->mti_object != NULL);
324 LASSERT(lu_object_assert_exists(info->mti_ctxt,
325 &info->mti_object->mot_obj.mo_lu));
329 req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
330 RCL_SERVER, info->mti_mdt->mdt_max_mdsize);
332 result = req_capsule_pack(&info->mti_pill);
336 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
339 result = mdt_getattr_internal(info, info->mti_object, 1);
345 * UPDATE lock should be taken against parent, and be release before exit;
346 * child_bits lock should be taken against child, and be returned back:
347 * (1)normal request should release the child lock;
348 * (2)intent request will grant the lock to client.
350 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
351 struct mdt_lock_handle *lhc,
353 struct ldlm_reply *ldlm_rep)
355 struct mdt_object *parent = info->mti_object;
356 struct mdt_object *child;
357 struct md_object *next = mdt_object_child(info->mti_object);
358 struct lu_fid *child_fid = &info->mti_tmp_fid1;
361 struct mdt_lock_handle *lhp;
364 LASSERT(info->mti_object != NULL);
365 name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
369 CDEBUG(D_INODE, "getattr with lock for "DFID3"/%s, ldlm_rep = %p\n",
370 PFID3(mdt_object_fid(parent)), name, ldlm_rep);
372 intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
373 if (strlen(name) == 0) {
374 /* only getattr on the child. parent is on another node. */
375 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
377 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID3
379 PFID3(mdt_object_fid(child)), ldlm_rep);
381 mdt_lock_handle_init(lhc);
382 lhc->mlh_mode = LCK_CR;
383 result = mdt_object_lock(info, child, lhc, child_bits);
385 /* finally, we can get attr for child. */
386 result = mdt_getattr_internal(info, child,
389 mdt_object_unlock(info, child, lhc, 1);
394 /*step 1: lock parent */
395 lhp = &info->mti_lh[MDT_LH_PARENT];
396 lhp->mlh_mode = LCK_CR;
397 result = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
401 /*step 2: lookup child's fid by name */
402 result = mdo_lookup(info->mti_ctxt, next, name, child_fid);
404 if (result == -ENOENT)
405 intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
406 GOTO(out_parent, result);
408 intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
410 *step 3: find the child object by fid & lock it.
411 * regardless if it is local or remote.
413 mdt_lock_handle_init(lhc);
414 lhc->mlh_mode = LCK_CR;
415 child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
417 GOTO(out_parent, result = PTR_ERR(child));
419 /* finally, we can get attr for child. */
420 result = mdt_getattr_internal(info, child,
423 mdt_object_unlock(info, child, lhc, 1);
425 struct ldlm_lock *lock;
426 struct ldlm_res_id *res_id;
427 lock = ldlm_handle2lock(&lhc->mlh_lh);
429 res_id = &lock->l_resource->lr_name;
430 LDLM_DEBUG(lock, "we will return this lock client\n");
431 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
432 &lock->l_resource->lr_name),
433 "Lock res_id: %lu/%lu/%lu, Fid: "DFID3".\n",
434 (unsigned long)res_id->name[0],
435 (unsigned long)res_id->name[1],
436 (unsigned long)res_id->name[2],
437 PFID3(mdt_object_fid(child)));
442 mdt_object_put(info->mti_ctxt, child);
446 mdt_object_unlock(info, parent, lhp, 1);
451 /* normal handler: should release the child lock */
452 static int mdt_getattr_name(struct mdt_thread_info *info)
454 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
459 req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
460 RCL_SERVER, info->mti_mdt->mdt_max_mdsize);
462 rc = req_capsule_pack(&info->mti_pill);
465 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
466 if (lustre_handle_is_used(&lhc->mlh_lh)) {
467 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
468 lhc->mlh_lh.cookie = 0;
473 static struct lu_device_operations mdt_lu_ops;
475 static int lu_device_is_mdt(struct lu_device *d)
477 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
480 static struct mdt_device *mdt_dev(struct lu_device *d)
482 LASSERT(lu_device_is_mdt(d));
483 return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
486 static int mdt_connect(struct mdt_thread_info *info)
489 struct ptlrpc_request *req;
491 req = mdt_info_req(info);
492 result = target_handle_connect(req, mdt_handle);
494 LASSERT(req->rq_export != NULL);
495 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
500 static int mdt_disconnect(struct mdt_thread_info *info)
502 return target_handle_disconnect(mdt_info_req(info));
505 static int mdt_sendpage(struct mdt_thread_info *info,
506 struct lu_rdpg *rdpg)
508 struct ptlrpc_request *req = mdt_info_req(info);
509 struct ptlrpc_bulk_desc *desc;
510 struct l_wait_info lwi;
517 desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
520 GOTO(out, rc = -ENOMEM);
522 for (i = 0, tmpcount = rdpg->rp_count;
523 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
524 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
525 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
528 LASSERT(desc->bd_nob == rdpg->rp_count);
529 rc = ptlrpc_start_bulk_transfer(desc);
533 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
534 GOTO(abort_bulk, rc);
536 lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
537 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
538 LASSERT (rc == 0 || rc == -ETIMEDOUT);
541 if (desc->bd_success &&
542 desc->bd_nob_transferred == rdpg->rp_count)
545 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
548 DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
549 (rc == -ETIMEDOUT) ? "timeout" : "network error",
550 desc->bd_nob_transferred, rdpg->rp_count,
551 req->rq_export->exp_client_uuid.uuid,
552 req->rq_export->exp_connection->c_remote_uuid.uuid);
554 class_fail_export(req->rq_export);
558 ptlrpc_abort_bulk(desc);
560 ptlrpc_free_bulk(desc);
565 static int mdt_readpage(struct mdt_thread_info *info)
567 struct mdt_object *object = info->mti_object;
568 struct lu_rdpg *rdpg = &info->mti_rdpg;
569 struct mdt_body *reqbody;
570 struct mdt_body *repbody;
575 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
578 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
579 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
580 if (reqbody == NULL || repbody == NULL)
584 * prepare @rdpg before calling lower layers and transfer itself. Here
585 * reqbody->size contains offset of where to start to read and
586 * reqbody->nlink contains number bytes to read.
588 rdpg->rp_hash = reqbody->size;
589 if ((__u64)rdpg->rp_hash != reqbody->size) {
590 CERROR("Invalid hash: %#llx != %#llx\n",
591 (__u64)rdpg->rp_hash, reqbody->size);
594 rdpg->rp_count = reqbody->nlink;
595 rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>> CFS_PAGE_SHIFT;
596 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
597 if (rdpg->rp_pages == NULL)
600 for (i = 0; i < rdpg->rp_npages; ++i) {
601 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
602 if (rdpg->rp_pages[i] == NULL)
603 GOTO(free_rdpg, rc = -ENOMEM);
606 /* call lower layers to fill allocated pages with directory data */
607 rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
611 /* send pages to client */
612 rc = mdt_sendpage(info, rdpg);
616 for (i = 0; i < rdpg->rp_npages; i++)
617 if (rdpg->rp_pages[i] != NULL)
618 __free_pages(rdpg->rp_pages[i], 0);
619 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
623 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
628 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_UNPACK, -EFAULT);
630 rc = mdt_reint_unpack(info, op);
632 rc = mdt_reint_rec(info);
638 static long mdt_reint_opcode(struct mdt_thread_info *info,
639 const struct req_format **fmt)
645 ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
648 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
649 if (opc < REINT_MAX && fmt[opc] != NULL)
650 req_capsule_extend(&info->mti_pill, fmt[opc]);
652 CERROR("Unsupported opc: %ld\n", opc);
657 static int mdt_reint(struct mdt_thread_info *info)
662 static const struct req_format *reint_fmts[REINT_MAX] = {
663 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
664 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
665 [REINT_LINK] = &RQF_MDS_REINT_LINK,
666 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
667 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
668 [REINT_OPEN] = &RQF_MDS_REINT_OPEN
673 opc = mdt_reint_opcode(info, reint_fmts);
675 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
677 rc = mdt_reint_internal(info, opc);
684 /* TODO these two methods not available now. */
686 /* this should sync the whole device */
687 static int mdt_device_sync(struct mdt_thread_info *info)
692 /* this should sync this object */
693 static int mdt_object_sync(struct mdt_thread_info *info)
698 static int mdt_sync(struct mdt_thread_info *info)
700 struct req_capsule *pill = &info->mti_pill;
701 struct mdt_body *body;
705 /* The fid may be zero, so we req_capsule_set manually */
706 req_capsule_set(pill, &RQF_MDS_SYNC);
708 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
712 if (fid_seq(&body->fid1) == 0) {
713 /* sync the whole device */
714 rc = req_capsule_pack(pill);
716 rc = mdt_device_sync(info);
719 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS | HABEO_REFERO);
721 rc = mdt_object_sync(info);
723 struct md_object *next;
724 const struct lu_fid *fid;
725 next = mdt_object_child(info->mti_object);
726 fid = mdt_object_fid(info->mti_object);
727 rc = mo_attr_get(info->mti_ctxt,
728 next, &info->mti_attr);
730 body = req_capsule_server_get(pill,
732 mdt_pack_attr2body(body,
733 &info->mti_attr.ma_attr,
742 static int mdt_handle_quotacheck(struct mdt_thread_info *info)
747 static int mdt_handle_quotactl(struct mdt_thread_info *info)
753 * OBD PING and other handlers.
756 static int mdt_obd_ping(struct mdt_thread_info *info)
760 result = target_handle_ping(mdt_info_req(info));
764 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
769 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
779 static struct ldlm_callback_suite cbs = {
780 .lcs_completion = ldlm_server_completion_ast,
781 .lcs_blocking = ldlm_server_blocking_ast,
785 static int mdt_enqueue(struct mdt_thread_info *info)
788 struct ptlrpc_request *req;
791 * info->mti_dlm_req already contains swapped and (if necessary)
792 * converted dlm request.
794 LASSERT(info->mti_dlm_req != NULL);
796 req = mdt_info_req(info);
797 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
798 result = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
799 req, info->mti_dlm_req, &cbs);
800 return result ? : req->rq_status;
803 static int mdt_convert(struct mdt_thread_info *info)
806 struct ptlrpc_request *req;
808 LASSERT(info->mti_dlm_req);
809 req = mdt_info_req(info);
810 result = ldlm_handle_convert0(req, info->mti_dlm_req);
811 return result ? : req->rq_status;
814 static int mdt_bl_callback(struct mdt_thread_info *info)
816 CERROR("bl callbacks should not happen on MDS\n");
821 static int mdt_cp_callback(struct mdt_thread_info *info)
823 CERROR("cp callbacks should not happen on MDS\n");
829 * Build (DLM) resource name from fid.
831 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
832 struct ldlm_res_id *name)
834 memset(name, 0, sizeof *name);
835 name->name[0] = fid_seq(f);
836 name->name[1] = fid_oid(f);
837 name->name[2] = fid_ver(f);
841 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
842 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
843 struct lustre_handle *lh, ldlm_mode_t mode,
844 ldlm_policy_data_t *policy,
845 struct ldlm_res_id *res_id)
847 int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/
854 /* FIXME: is that correct to have @flags=0 here? */
855 rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, res_id),
856 LDLM_IBITS, policy, mode, &flags,
857 ldlm_blocking_ast, ldlm_completion_ast, NULL,
858 NULL, NULL, 0, NULL, lh);
859 return rc == ELDLM_OK ? 0 : -EIO;
862 /* just call ldlm_lock_decref() if decref,
863 * else we only call ptlrpc_save_lock() to save this lock in req.
864 * when transaction committed, req will be released and lock will be released */
865 void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f,
866 struct lustre_handle *lh, ldlm_mode_t mode, int decref)
869 /* FIXME: this is debug stuff, remove it later. */
870 struct ldlm_lock *lock = ldlm_handle2lock(lh);
872 CERROR("invalid lock handle "LPX64, lh->cookie);
875 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
879 ldlm_lock_decref(lh, mode);
881 ptlrpc_save_lock(req, lh, mode);
884 static struct mdt_object *mdt_obj(struct lu_object *o)
886 LASSERT(lu_device_is_mdt(o->lo_dev));
887 return container_of0(o, struct mdt_object, mot_obj.mo_lu);
890 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
891 struct mdt_device *d,
892 const struct lu_fid *f)
895 struct mdt_object *m;
898 o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
900 m = (struct mdt_object *)o;
906 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
907 struct mdt_lock_handle *lh, __u64 ibits)
909 ldlm_policy_data_t *policy = &info->mti_policy;
910 struct ldlm_res_id *res_id = &info->mti_res_id;
911 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
915 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
916 LASSERT(lh->mlh_mode != LCK_MINMODE);
918 policy->l_inodebits.bits = ibits;
920 rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, policy, res_id);
924 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
925 struct mdt_lock_handle *lh, int decref)
927 struct ptlrpc_request *req = mdt_info_req(info);
930 if (lustre_handle_is_used(&lh->mlh_lh)) {
931 fid_unlock(req, mdt_object_fid(o),
932 &lh->mlh_lh, lh->mlh_mode, decref);
933 lh->mlh_lh.cookie = 0;
938 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
939 const struct lu_fid *f,
940 struct mdt_lock_handle *lh,
943 struct mdt_object *o;
945 o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
949 result = mdt_object_lock(info, o, lh, ibits);
951 mdt_object_put(info->mti_ctxt, o);
958 void mdt_object_unlock_put(struct mdt_thread_info * info,
959 struct mdt_object * o,
960 struct mdt_lock_handle *lh,
963 mdt_object_unlock(info, o, lh, decref);
964 mdt_object_put(info->mti_ctxt, o);
967 static struct mdt_handler *mdt_handler_find(__u32 opc,
968 struct mdt_opc_slice *supported)
970 struct mdt_opc_slice *s;
971 struct mdt_handler *h;
974 for (s = supported; s->mos_hs != NULL; s++) {
975 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
976 h = s->mos_hs + (opc - s->mos_opc_start);
978 LASSERT(h->mh_opc == opc);
980 h = NULL; /* unsupported opc */
987 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
989 return req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid;
992 static int mdt_lock_resname_compat(struct mdt_device *m,
993 struct ldlm_request *req)
995 /* XXX something... later. */
999 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1001 /* XXX something... later. */
1006 * Generic code handling requests that have struct mdt_body passed in:
1008 * - extract mdt_body from request and save it in @info, if present;
1010 * - create lu_object, corresponding to the fid in mdt_body, and save it in
1013 * - if HABEO_CORPUS flag is set for this request type check whether object
1014 * actually exists on storage (lu_object_exists()).
1017 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1019 const struct mdt_body *body;
1020 struct mdt_object *obj;
1021 const struct lu_context *ctx;
1022 struct req_capsule *pill;
1025 ctx = info->mti_ctxt;
1026 pill = &info->mti_pill;
1028 body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1030 if (fid_is_sane(&body->fid1)) {
1031 obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
1033 if ((flags & HABEO_CORPUS) &&
1034 !lu_object_exists(ctx,
1035 &obj->mot_obj.mo_lu)) {
1036 mdt_object_put(ctx, obj);
1039 info->mti_object = obj;
1043 result = PTR_ERR(obj);
1045 CERROR("Invalid fid: "DFID3"\n", PFID3(&body->fid1));
1053 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1055 struct req_capsule *pill;
1059 pill = &info->mti_pill;
1061 if (req_capsule_has_field(pill, &RMF_MDT_BODY))
1062 result = mdt_body_unpack(info, flags);
1066 if (result == 0 && (flags & HABEO_REFERO))
1067 result = req_capsule_pack(pill);
1072 /* FIXME: fake untill journal callback is OK.*/
1073 struct lu_context_key mdt_txn_key;
1075 int mdt_update_last_transno(struct mdt_thread_info *info, int rc)
1077 struct mdt_device *mdt = info->mti_mdt;
1078 struct ptlrpc_request *req = mdt_info_req(info);
1079 struct obd_export *exp = req->rq_export;
1081 __u64 last_committed;
1083 if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1085 if (info->mti_trans_flags & MDT_NONEED_TANSNO)
1088 last_committed = mdt->mdt_last_committed;
1091 last_transno = info->mti_transno;
1093 if (info->mti_transno != 0)
1094 CERROR("replay %s transno "LPU64" failed: rc %d\n",
1095 libcfs_nid2str(exp->exp_connection->c_peer.nid),
1096 info->mti_transno, rc);
1099 CDEBUG(D_INODE, "last_transno = %llu, last_committed = %llu\n",
1100 last_transno, last_committed);
1102 req->rq_repmsg->transno = req->rq_transno = last_transno;
1103 req->rq_repmsg->last_xid = req->rq_xid;
1104 req->rq_repmsg->last_committed = last_committed;
1105 exp->exp_obd->obd_last_committed = last_committed;
1110 * Invoke handler for this request opc. Also do necessary preprocessing
1111 * (according to handler ->mh_flags), and post-processing (setting of
1112 * ->last_{xid,committed}).
1114 static int mdt_req_handle(struct mdt_thread_info *info,
1115 struct mdt_handler *h, struct ptlrpc_request *req)
1122 LASSERT(h->mh_act != NULL);
1123 LASSERT(h->mh_opc == req->rq_reqmsg->opc);
1124 LASSERT(current->journal_info == NULL);
1126 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1128 if (h->mh_fail_id != 0)
1129 OBD_FAIL_RETURN(h->mh_fail_id, 0);
1132 flags = h->mh_flags;
1133 LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO), h->mh_fmt != NULL));
1135 if (h->mh_fmt != NULL) {
1136 req_capsule_set(&info->mti_pill, h->mh_fmt);
1137 result = mdt_unpack_req_pack_rep(info, flags);
1140 if (result == 0 && flags & MUTABOR &&
1141 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1144 if (result == 0 && flags & HABEO_CLAVIS) {
1145 struct ldlm_request *dlm_req;
1147 LASSERT(h->mh_fmt != NULL);
1149 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1150 if (dlm_req != NULL) {
1151 if (info->mti_mdt->mdt_opts.mo_compat_resname)
1152 result = mdt_lock_resname_compat(info->mti_mdt,
1154 info->mti_dlm_req = dlm_req;
1156 CERROR("Can't unpack dlm request\n");
1165 result = h->mh_act(info);
1167 * XXX result value is unconditionally shoved into ->rq_status
1168 * (original code sometimes placed error code into ->rq_status, and
1169 * sometimes returned it to the
1170 * caller). ptlrpc_server_handle_request() doesn't check return value
1173 req->rq_status = result;
1175 LASSERT(current->journal_info == NULL);
1177 if (flags & HABEO_CLAVIS && info->mti_mdt->mdt_opts.mo_compat_resname) {
1178 struct ldlm_reply *dlm_rep;
1180 dlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1181 if (dlm_rep != NULL)
1182 result = mdt_lock_reply_compat(info->mti_mdt, dlm_rep);
1185 /* If we're DISCONNECTing, the mdt_export_data is already freed */
1187 if (h->mh_opc != MDS_DISCONNECT &&
1188 h->mh_opc != MDS_READPAGE &&
1189 h->mh_opc != LDLM_ENQUEUE) {
1190 mdt_update_last_transno(info, result);
1196 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1198 lh->mlh_lh.cookie = 0ull;
1199 lh->mlh_mode = LCK_MINMODE;
1202 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1204 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1207 static void mdt_thread_info_init(struct ptlrpc_request *req,
1208 struct mdt_thread_info *info)
1212 memset(info, 0, sizeof(*info));
1214 info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1215 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1216 info->mti_rep_buf_size[i] = -1;
1218 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1219 mdt_lock_handle_init(&info->mti_lh[i]);
1221 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1222 info->mti_ctxt = req->rq_svc_thread->t_ctx;
1223 info->mti_transno = req->rq_reqmsg->transno;
1224 /* it can be NULL while CONNECT */
1226 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1227 req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1228 info->mti_rep_buf_size);
1231 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1235 req_capsule_fini(&info->mti_pill);
1236 if (info->mti_object != NULL) {
1237 mdt_object_put(info->mti_ctxt, info->mti_object);
1238 info->mti_object = NULL;
1240 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1241 mdt_lock_handle_fini(&info->mti_lh[i]);
1245 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1246 struct obd_device *obd, int *process);
1248 * Handle recovery. Return:
1249 * +1: continue request processing;
1250 * -ve: abort immediately with the given error code;
1251 * 0: send reply with error code in req->rq_status;
1253 static int mdt_recovery(struct ptlrpc_request *req)
1257 struct obd_device *obd;
1261 if (req->rq_reqmsg->opc == MDS_CONNECT)
1264 if (req->rq_export == NULL) {
1265 CERROR("operation %d on unconnected MDS from %s\n",
1266 req->rq_reqmsg->opc,
1267 libcfs_id2str(req->rq_peer));
1268 req->rq_status = -ENOTCONN;
1272 /* sanity check: if the xid matches, the request must be marked as a
1273 * resent or replayed */
1274 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
1275 lustre_msg_get_flags(req->rq_reqmsg) &
1276 (MSG_RESENT | MSG_REPLAY)),
1277 "rq_xid "LPU64" matches last_xid, "
1278 "expected RESENT flag\n", req->rq_xid);
1280 /* else: note the opposite is not always true; a RESENT req after a
1281 * failover will usually not match the last_xid, since it was likely
1282 * never committed. A REPLAYed request will almost never match the
1283 * last xid, however it could for a committed, but still retained,
1286 obd = req->rq_export->exp_obd;
1288 /* Check for aborted recovery... */
1289 spin_lock_bh(&obd->obd_processing_task_lock);
1290 abort_recovery = obd->obd_abort_recovery;
1291 recovering = obd->obd_recovering;
1292 spin_unlock_bh(&obd->obd_processing_task_lock);
1293 if (abort_recovery) {
1294 target_abort_recovery(obd);
1295 } else if (recovering) {
1299 rc = mds_filter_recovery_request(req, obd, &should_process);
1300 if (rc != 0 || !should_process) {
1308 static int mdt_reply(struct ptlrpc_request *req, int result,
1309 struct mdt_thread_info *info)
1311 struct obd_device *obd;
1314 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1315 if (req->rq_reqmsg->opc != OBD_PING)
1316 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1318 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1319 if (obd && obd->obd_recovering) {
1320 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1321 RETURN(target_queue_final_reply(req, result));
1323 /* Lost a race with recovery; let the error path
1325 result = req->rq_status = -ENOTCONN;
1328 target_send_reply(req, result, info->mti_fail_id);
1333 extern int mds_msg_check_version(struct lustre_msg *msg);
1335 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info,
1336 struct mdt_opc_slice *supported)
1338 struct mdt_handler *h;
1339 struct lustre_msg *msg;
1344 OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1346 LASSERT(current->journal_info == NULL);
1348 msg = req->rq_reqmsg;
1349 result = mds_msg_check_version(msg);
1351 result = mdt_recovery(req);
1354 h = mdt_handler_find(msg->opc, supported);
1356 result = mdt_req_handle(info, h, req);
1358 req->rq_status = -ENOTSUPP;
1359 result = ptlrpc_error(req);
1364 result = mdt_reply(req, result, info);
1367 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
1372 * MDT handler function called by ptlrpc service thread when request comes.
1374 * XXX common "target" functionality should be factored into separate module
1375 * shared by mdt, ost and stand-alone services like fld.
1377 static int mdt_handle_common(struct ptlrpc_request *req,
1378 struct mdt_opc_slice *supported)
1380 struct lu_context *ctx;
1381 struct mdt_thread_info *info;
1385 ctx = req->rq_svc_thread->t_ctx;
1386 LASSERT(ctx != NULL);
1387 LASSERT(ctx->lc_thread == req->rq_svc_thread);
1388 info = lu_context_key_get(ctx, &mdt_thread_key);
1389 LASSERT(info != NULL);
1391 mdt_thread_info_init(req, info);
1393 result = mdt_handle0(req, info, supported);
1395 mdt_thread_info_fini(info);
1399 static int mdt_handle(struct ptlrpc_request *req)
1401 return mdt_handle_common(req, mdt_handlers);
1404 static int mdt_readpage_handle(struct ptlrpc_request *req)
1406 return mdt_handle_common(req, mdt_readpage_handlers);
1409 /*Please move these functions from mds to mdt*/
1410 int intent_disposition(struct ldlm_reply *rep, int flag)
1414 return (rep->lock_policy_res1 & flag);
1417 void intent_set_disposition(struct ldlm_reply *rep, int flag)
1421 rep->lock_policy_res1 |= flag;
1437 static int mdt_intent_getattr(enum mdt_it_code opcode,
1438 struct mdt_thread_info *info,
1439 struct ldlm_lock **,
1441 static int mdt_intent_reint(enum mdt_it_code opcode,
1442 struct mdt_thread_info *info,
1443 struct ldlm_lock **,
1446 static struct mdt_it_flavor {
1447 const struct req_format *it_fmt;
1449 int (*it_act)(enum mdt_it_code ,
1450 struct mdt_thread_info *,
1451 struct ldlm_lock **,
1454 } mdt_it_flavor[] = {
1456 .it_fmt = &RQF_LDLM_INTENT,
1457 /*.it_flags = HABEO_REFERO,*/
1459 .it_act = mdt_intent_reint,
1460 .it_reint = REINT_OPEN
1463 .it_fmt = &RQF_LDLM_INTENT,
1464 .it_flags = MUTABOR,
1465 .it_act = mdt_intent_reint,
1466 .it_reint = REINT_OPEN
1469 .it_fmt = &RQF_LDLM_INTENT,
1470 .it_flags = MUTABOR,
1471 .it_act = mdt_intent_reint,
1472 .it_reint = REINT_CREATE
1474 [MDT_IT_GETATTR] = {
1475 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1477 .it_act = mdt_intent_getattr
1479 [MDT_IT_READDIR] = {
1485 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1487 .it_act = mdt_intent_getattr
1490 .it_fmt = &RQF_LDLM_INTENT_UNLINK,
1491 .it_flags = MUTABOR,
1492 .it_act = NULL, /* XXX can be mdt_intent_reint, ? */
1493 .it_reint = REINT_UNLINK
1497 .it_flags = MUTABOR,
1500 [MDT_IT_GETXATTR] = {
1507 static int mdt_intent_getattr(enum mdt_it_code opcode,
1508 struct mdt_thread_info *info,
1509 struct ldlm_lock **lockp,
1512 struct ldlm_lock *old_lock = *lockp;
1513 struct ldlm_lock *new_lock = NULL;
1514 struct ptlrpc_request *req = mdt_info_req(info);
1515 struct ldlm_reply *ldlm_rep;
1516 struct mdt_lock_handle tmp_lock;
1517 struct mdt_lock_handle *lhc = &tmp_lock;
1518 struct mdt_device *mdt = info->mti_mdt;
1526 child_bits = MDS_INODELOCK_LOOKUP;
1528 case MDT_IT_GETATTR:
1529 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1532 CERROR("Unhandled till now");
1536 req_capsule_set_size(&info->mti_pill, &RMF_MDT_MD,
1537 RCL_SERVER, mdt->mdt_max_mdsize);
1539 rc = req_capsule_pack(&info->mti_pill);
1542 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1543 intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
1545 ldlm_rep->lock_policy_res2 =
1546 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
1548 if (intent_disposition(ldlm_rep, DISP_LOOKUP_NEG))
1549 ldlm_rep->lock_policy_res2 = 0;
1550 if (!intent_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
1551 ldlm_rep->lock_policy_res2) {
1552 RETURN(ELDLM_LOCK_ABORTED);
1555 new_lock = ldlm_handle2lock(&lhc->mlh_lh);
1556 if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1559 LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
1560 opcode, lhc->mlh_lh.cookie);
1564 /* FIXME:This only happens when MDT can handle RESENT */
1565 if (new_lock->l_export == req->rq_export) {
1566 /* Already gave this to the client, which means that we
1567 * reconstructed a reply. */
1568 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1570 RETURN(ELDLM_LOCK_REPLACED);
1574 * These are copied from mds/hander.c, and should be factored into
1575 * ldlm module in order to share these code, and be easy for merge.
1578 /* Fixup the lock to be given to the client */
1579 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1580 new_lock->l_readers = 0;
1581 new_lock->l_writers = 0;
1583 new_lock->l_export = class_export_get(req->rq_export);
1584 list_add(&new_lock->l_export_chain,
1585 &new_lock->l_export->exp_ldlm_data.led_held_locks);
1587 new_lock->l_blocking_ast = old_lock->l_blocking_ast;
1588 new_lock->l_completion_ast = old_lock->l_completion_ast;
1590 new_lock->l_remote_handle = old_lock->l_remote_handle;
1592 new_lock->l_flags &= ~LDLM_FL_LOCAL;
1594 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1595 LDLM_LOCK_PUT(new_lock);
1597 RETURN(ELDLM_LOCK_REPLACED);
1600 static int mdt_intent_reint(enum mdt_it_code opcode,
1601 struct mdt_thread_info *info,
1602 struct ldlm_lock **lockp,
1607 struct ldlm_reply *rep;
1609 static const struct req_format *intent_fmts[REINT_MAX] = {
1610 [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
1611 [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN
1616 opc = mdt_reint_opcode(info, intent_fmts);
1620 if (mdt_it_flavor[opcode].it_reint != opc) {
1621 CERROR("Reint code %ld doesn't match intent: %d\n",
1626 rc = mdt_reint_internal(info, opc);
1628 rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1631 rep->lock_policy_res2 = rc;
1633 intent_set_disposition(rep, DISP_IT_EXECD);
1635 mdt_update_last_transno(info, rep->lock_policy_res2);
1637 RETURN(ELDLM_LOCK_ABORTED);
1640 static int mdt_intent_code(long itcode)
1646 result = MDT_IT_OPEN;
1648 case IT_OPEN|IT_CREAT:
1649 result = MDT_IT_OCREAT;
1652 result = MDT_IT_CREATE;
1655 result = MDT_IT_READDIR;
1658 result = MDT_IT_GETATTR;
1661 result = MDT_IT_LOOKUP;
1664 result = MDT_IT_UNLINK;
1667 result = MDT_IT_TRUNC;
1670 result = MDT_IT_GETXATTR;
1673 CERROR("Unknown intent opcode: %ld\n", itcode);
1680 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
1681 struct ldlm_lock **lockp, int flags)
1683 struct req_capsule *pill;
1684 struct mdt_it_flavor *flv;
1689 opc = mdt_intent_code(itopc);
1693 pill = &info->mti_pill;
1694 flv = &mdt_it_flavor[opc];
1696 if (flv->it_fmt != NULL)
1697 req_capsule_extend(pill, flv->it_fmt);
1699 rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
1701 struct ptlrpc_request *req = mdt_info_req(info);
1702 if (flv->it_flags & MUTABOR &&
1703 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1706 if (rc == 0 && flv->it_act != NULL) {
1707 /* execute policy */
1708 rc = flv->it_act(opc, info, lockp, flags);
1714 static int mdt_intent_policy(struct ldlm_namespace *ns,
1715 struct ldlm_lock **lockp, void *req_cookie,
1716 ldlm_mode_t mode, int flags, void *data)
1718 struct mdt_thread_info *info;
1719 struct ptlrpc_request *req = req_cookie;
1720 struct ldlm_intent *it;
1721 struct req_capsule *pill;
1722 struct ldlm_lock *lock = *lockp;
1727 LASSERT(req != NULL);
1729 info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1730 LASSERT(info != NULL);
1731 pill = &info->mti_pill;
1732 LASSERT(pill->rc_req == req);
1734 if (req->rq_reqmsg->bufcount > MDS_REQ_INTENT_IT_OFF) {
1735 req_capsule_extend(pill, &RQF_LDLM_INTENT);
1736 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
1738 LDLM_DEBUG(lock, "intent policy opc: %s",
1739 ldlm_it2str(it->opc));
1741 rc = mdt_intent_opc(it->opc, info, lockp, flags);
1747 /* No intent was provided */
1748 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
1749 rc = req_capsule_pack(pill);
1757 static int mdt_seq_fini(const struct lu_context *ctx,
1758 struct mdt_device *m)
1760 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1763 if (ls && ls->ls_server_seq) {
1764 seq_server_fini(ls->ls_server_seq, ctx);
1765 OBD_FREE_PTR(ls->ls_server_seq);
1766 ls->ls_server_seq = NULL;
1768 if (ls && ls->ls_control_seq) {
1769 seq_server_fini(ls->ls_control_seq, ctx);
1770 OBD_FREE_PTR(ls->ls_control_seq);
1771 ls->ls_control_seq = NULL;
1776 static int mdt_seq_init(const struct lu_context *ctx,
1778 struct mdt_device *m)
1784 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1786 /* sequence-controller node */
1787 if (ls->ls_node_id == 0) {
1788 LASSERT(ls->ls_control_seq == NULL);
1789 OBD_ALLOC_PTR(ls->ls_control_seq);
1791 if (ls->ls_control_seq != NULL) {
1792 rc = seq_server_init(ls->ls_control_seq,
1793 m->mdt_bottom, uuid,
1800 LASSERT(ls->ls_server_seq == NULL);
1801 OBD_ALLOC_PTR(ls->ls_server_seq);
1803 if (ls->ls_server_seq != NULL) {
1804 rc = seq_server_init(ls->ls_server_seq,
1805 m->mdt_bottom, uuid,
1812 mdt_seq_fini(ctx, m);
1818 * Init client sequence manager which is used by local MDS to talk to sequence
1819 * controller on remote node.
1821 static int mdt_seq_init_cli(const struct lu_context *ctx,
1822 struct mdt_device *m,
1823 struct lustre_cfg *cfg)
1825 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1826 struct obd_device *mdc;
1827 struct obd_uuid *uuidp;
1831 struct mdt_thread_info *info;
1832 char *p, *index_string = lustre_cfg_string(cfg, 2);
1835 info = lu_context_key_get(ctx, &mdt_thread_key);
1836 uuidp = &info->mti_u.uuid;
1838 LASSERT(index_string);
1840 index = simple_strtol(index_string, &p, 10);
1842 CERROR("Invalid index in lustre_cgf, offset 2\n");
1846 /* check if this is first MDC add and controller is not yet
1848 if (index != 0 || ls->ls_client_exp)
1851 uuid_str = lustre_cfg_string(cfg, 1);
1852 obd_str2uuid(uuidp, uuid_str);
1853 mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, NULL);
1855 CERROR("can't find controller MDC by uuid %s\n",
1858 } else if (!mdc->obd_set_up) {
1859 CERROR("target %s not set up\n", mdc->obd_name);
1862 struct lustre_handle conn = {0, };
1864 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
1865 mdc->obd_name, mdc->obd_uuid.uuid);
1867 rc = obd_connect(ctx, &conn, mdc, &mdc->obd_uuid, NULL);
1870 CERROR("target %s connect error %d\n",
1873 ls->ls_client_exp = class_conn2export(&conn);
1875 OBD_ALLOC_PTR(ls->ls_client_seq);
1877 if (ls->ls_client_seq != NULL) {
1878 rc = seq_client_init(ls->ls_client_seq,
1887 LASSERT(ls->ls_server_seq != NULL);
1889 rc = seq_server_set_cli(ls->ls_server_seq,
1898 static void mdt_seq_fini_cli(struct mdt_device *m)
1904 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1906 if (ls && ls->ls_server_seq)
1907 seq_server_set_cli(ls->ls_server_seq,
1910 if (ls && ls->ls_client_seq) {
1911 seq_client_fini(ls->ls_client_seq);
1912 OBD_FREE_PTR(ls->ls_client_seq);
1913 ls->ls_client_seq = NULL;
1916 if (ls && ls->ls_client_exp) {
1917 int rc = obd_disconnect(ls->ls_client_exp);
1918 ls->ls_client_exp = NULL;
1921 CERROR("failure to disconnect "
1931 static int mdt_fld_init(const struct lu_context *ctx,
1933 struct mdt_device *m)
1939 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1941 OBD_ALLOC_PTR(ls->ls_server_fld);
1943 if (ls->ls_server_fld != NULL) {
1944 rc = fld_server_init(ls->ls_server_fld, ctx,
1945 m->mdt_bottom, uuid);
1947 OBD_FREE_PTR(ls->ls_server_fld);
1948 ls->ls_server_fld = NULL;
1956 static int mdt_fld_fini(const struct lu_context *ctx,
1957 struct mdt_device *m)
1959 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1962 if (ls && ls->ls_server_fld) {
1963 fld_server_fini(ls->ls_server_fld, ctx);
1964 OBD_FREE_PTR(ls->ls_server_fld);
1965 ls->ls_server_fld = NULL;
1970 /* device init/fini methods */
1971 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1973 if (m->mdt_service != NULL) {
1974 ptlrpc_unregister_service(m->mdt_service);
1975 m->mdt_service = NULL;
1977 if (m->mdt_readpage_service != NULL) {
1978 ptlrpc_unregister_service(m->mdt_readpage_service);
1979 m->mdt_readpage_service = NULL;
1981 if (m->mdt_setattr_service != NULL) {
1982 ptlrpc_unregister_service(m->mdt_setattr_service);
1983 m->mdt_setattr_service = NULL;
1987 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1990 struct ptlrpc_service_conf conf = {
1991 .psc_nbufs = MDS_NBUFS,
1992 .psc_bufsize = MDS_BUFSIZE,
1993 .psc_max_req_size = MDS_MAXREQSIZE,
1994 .psc_max_reply_size = MDS_MAXREPSIZE,
1995 .psc_req_portal = MDS_REQUEST_PORTAL,
1996 .psc_rep_portal = MDC_REPLY_PORTAL,
1997 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
1999 * We'd like to have a mechanism to set this on a per-device
2000 * basis, but alas...
2002 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2004 .psc_ctx_tags = LCT_MD_THREAD
2009 m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2010 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2011 "mdt_ldlm_client", m->mdt_ldlm_client);
2014 ptlrpc_init_svc_conf(&conf, mdt_handle, LUSTRE_MDT0_NAME,
2015 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2017 if (m->mdt_service == NULL)
2020 rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
2022 GOTO(err_mdt_svc, rc);
2025 * readpage service configuration. Parameters have to be adjusted,
2028 conf = (typeof(conf)) {
2029 .psc_nbufs = MDS_NBUFS,
2030 .psc_bufsize = MDS_BUFSIZE,
2031 .psc_max_req_size = MDS_MAXREQSIZE,
2032 .psc_max_reply_size = MDS_MAXREPSIZE,
2033 .psc_req_portal = MDS_READPAGE_PORTAL,
2034 .psc_rep_portal = MDC_REPLY_PORTAL,
2035 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2036 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2038 .psc_ctx_tags = LCT_MD_THREAD
2040 m->mdt_readpage_service =
2041 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2042 LUSTRE_MDT0_NAME "_readpage",
2043 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2046 if (m->mdt_readpage_service == NULL) {
2047 CERROR("failed to start readpage service\n");
2048 GOTO(err_mdt_svc, rc = -ENOMEM);
2051 rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2054 * setattr service configuration.
2056 conf = (typeof(conf)) {
2057 .psc_nbufs = MDS_NBUFS,
2058 .psc_bufsize = MDS_BUFSIZE,
2059 .psc_max_req_size = MDS_MAXREQSIZE,
2060 .psc_max_reply_size = MDS_MAXREPSIZE,
2061 .psc_req_portal = MDS_SETATTR_PORTAL,
2062 .psc_rep_portal = MDC_REPLY_PORTAL,
2063 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2064 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2066 .psc_ctx_tags = LCT_MD_THREAD
2069 m->mdt_setattr_service =
2070 ptlrpc_init_svc_conf(&conf, mdt_handle,
2071 LUSTRE_MDT0_NAME "_setattr",
2072 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2075 if (!m->mdt_setattr_service) {
2076 CERROR("failed to start setattr service\n");
2077 GOTO(err_mdt_svc, rc = -ENOMEM);
2080 rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2082 GOTO(err_mdt_svc, rc);
2087 mdt_stop_ptlrpc_service(m);
2092 static void mdt_stack_fini(const struct lu_context *ctx,
2093 struct mdt_device *m, struct lu_device *d)
2095 /* goes through all stack */
2097 struct lu_device *n;
2098 struct obd_type *type;
2099 struct lu_device_type *ldt = d->ld_type;
2103 /* each fini() returns next device in stack of layers
2104 * * so we can avoid the recursion */
2105 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
2106 ldt->ldt_ops->ldto_device_free(ctx, d);
2108 type = ldt->ldt_obd_type;
2110 class_put_type(type);
2111 /* switch to the next device in the layer */
2114 m->mdt_child = NULL;
2117 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
2118 const char *typename,
2119 struct lu_device *child,
2120 struct lustre_cfg *cfg)
2122 struct obd_type *type;
2123 struct lu_device_type *ldt;
2124 struct lu_device *d;
2128 type = class_get_type(typename);
2130 CERROR("Unknown type: '%s'\n", typename);
2131 GOTO(out, rc = -ENODEV);
2136 CERROR("type: '%s'\n", typename);
2137 GOTO(out_type, rc = -EINVAL);
2140 ldt->ldt_obd_type = type;
2141 d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
2143 CERROR("Cannot allocate device: '%s'\n", typename);
2144 GOTO(out_type, rc = -ENODEV);
2147 LASSERT(child->ld_site);
2148 d->ld_site = child->ld_site;
2151 rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
2153 CERROR("can't init device '%s', rc %d\n", typename, rc);
2154 GOTO(out_alloc, rc);
2160 ldt->ldt_ops->ldto_device_free(ctx, d);
2163 class_put_type(type);
2168 static int mdt_stack_init(const struct lu_context *ctx,
2169 struct mdt_device *m, struct lustre_cfg *cfg)
2171 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2172 struct lu_device *tmp;
2173 struct md_device *md;
2177 /* init the stack */
2178 tmp = mdt_layer_setup(ctx, LUSTRE_OSD0_NAME, d, cfg);
2180 RETURN(PTR_ERR(tmp));
2182 m->mdt_bottom = lu2dt_dev(tmp);
2184 tmp = mdt_layer_setup(ctx, LUSTRE_MDD0_NAME, d, cfg);
2186 GOTO(out, rc = PTR_ERR(tmp));
2191 tmp = mdt_layer_setup(ctx, LUSTRE_CMM0_NAME, d, cfg);
2193 GOTO(out, rc = PTR_ERR(tmp));
2196 /*set mdd upcall device*/
2197 md->md_upcall.mu_upcall_dev = lu2md_dev(d);
2200 /*set cmm upcall device*/
2201 md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
2203 m->mdt_child = lu2md_dev(d);
2205 /* process setup config */
2206 tmp = &m->mdt_md_dev.md_lu_dev;
2207 rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2211 /* fini from last known good lu_device */
2213 mdt_stack_fini(ctx, m, d);
2218 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2220 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2221 struct lu_site *ls = d->ld_site;
2226 mdt_fs_cleanup(ctx, m);
2227 ping_evictor_stop();
2228 mdt_stop_ptlrpc_service(m);
2230 /* finish the stack */
2231 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2233 mdt_fld_fini(ctx, m);
2234 mdt_seq_fini(ctx, m);
2235 mdt_seq_fini_cli(m);
2237 LASSERT(atomic_read(&d->ld_ref) == 0);
2238 md_device_fini(&m->mdt_md_dev);
2240 if (m->mdt_namespace != NULL) {
2241 ldlm_namespace_free(m->mdt_namespace, 0);
2242 m->mdt_namespace = NULL;
2253 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2254 struct lu_device_type *t, struct lustre_cfg *cfg)
2256 struct mdt_thread_info *info;
2257 struct obd_device *obd;
2258 const char *dev = lustre_cfg_string(cfg, 0);
2259 const char *num = lustre_cfg_string(cfg, 2);
2264 info = lu_context_key_get(ctx, &mdt_thread_key);
2265 LASSERT(info != NULL);
2267 obd = class_name2obd(dev);
2269 spin_lock_init(&m->mdt_transno_lock);
2270 /* FIXME: We need to load them from disk. But now fake it */
2271 m->mdt_last_transno = 1;
2272 m->mdt_last_committed = 1;
2273 m->mdt_max_mdsize = MAX_MD_SIZE;
2274 m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2276 spin_lock_init(&m->mdt_epoch_lock);
2277 /* Temporary. should parse mount option. */
2278 m->mdt_opts.mo_user_xattr = 0;
2279 m->mdt_opts.mo_acl = 0;
2280 m->mdt_opts.mo_compat_resname = 0;
2281 obd->obd_replayable = 1;
2288 md_device_init(&m->mdt_md_dev, t);
2289 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2290 m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2292 rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2294 CERROR("can't init lu_site, rc %d\n", rc);
2295 GOTO(err_free_site, rc);
2298 /* init the stack */
2299 rc = mdt_stack_init(ctx, m, cfg);
2301 CERROR("can't init device stack, rc %d\n", rc);
2302 GOTO(err_fini_site, rc);
2304 /* set server index */
2306 s->ls_node_id = simple_strtol(num, NULL, 10);
2308 rc = mdt_fld_init(ctx, obd->obd_name, m);
2310 GOTO(err_fini_stack, rc);
2312 rc = mdt_seq_init(ctx, obd->obd_name, m);
2314 GOTO(err_fini_fld, rc);
2316 snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
2317 LUSTRE_MDT0_NAME"-%p", m);
2318 m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
2319 LDLM_NAMESPACE_SERVER);
2320 if (m->mdt_namespace == NULL)
2321 GOTO(err_fini_seq, rc = -ENOMEM);
2323 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
2325 rc = mdt_start_ptlrpc_service(m);
2327 GOTO(err_free_ns, rc);
2329 ping_evictor_start();
2330 rc = mdt_fs_setup(ctx, m);
2332 GOTO(err_stop_service, rc);
2336 mdt_stop_ptlrpc_service(m);
2338 ldlm_namespace_free(m->mdt_namespace, 0);
2339 m->mdt_namespace = NULL;
2341 mdt_seq_fini(ctx, m);
2343 mdt_fld_fini(ctx, m);
2345 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2350 md_device_fini(&m->mdt_md_dev);
2354 /* used by MGS to process specific configurations */
2355 static int mdt_process_config(const struct lu_context *ctx,
2356 struct lu_device *d, struct lustre_cfg *cfg)
2358 struct mdt_device *m = mdt_dev(d);
2359 struct md_device *md_next = m->mdt_child;
2360 struct lu_device *next = md2lu_dev(md_next);
2364 switch (cfg->lcfg_command) {
2367 * Add mdc hook to get first MDT uuid and connect it to
2368 * ls->controller to use for seq manager.
2370 err = mdt_seq_init_cli(ctx, mdt_dev(d), cfg);
2372 CERROR("can't initialize controller export, "
2376 /* others are passed further */
2377 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
2383 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
2384 const struct lu_object_header *hdr,
2385 struct lu_device *d)
2387 struct mdt_object *mo;
2393 struct lu_object *o;
2394 struct lu_object_header *h;
2396 o = &mo->mot_obj.mo_lu;
2397 h = &mo->mot_header;
2398 lu_object_header_init(h);
2399 lu_object_init(o, h, d);
2400 lu_object_add_top(h, o);
2401 o->lo_ops = &mdt_obj_ops;
2407 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
2409 struct mdt_device *d = mdt_dev(o->lo_dev);
2410 struct lu_device *under;
2411 struct lu_object *below;
2415 CDEBUG(D_INODE, "object init, fid = "DFID3"\n",
2416 PFID3(lu_object_fid(o)));
2418 under = &d->mdt_child->md_lu_dev;
2419 below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
2420 if (below != NULL) {
2421 lu_object_add(o, below);
2427 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
2429 struct mdt_object *mo = mdt_obj(o);
2430 struct lu_object_header *h;
2434 CDEBUG(D_INODE, "object free, fid = "DFID3"\n",
2435 PFID3(lu_object_fid(o)));
2438 lu_object_header_fini(h);
2443 static int mdt_object_print(const struct lu_context *ctxt, void *cookie,
2444 lu_printer_t p, const struct lu_object *o)
2446 return (*p)(ctxt, cookie, LUSTRE_MDT0_NAME"-object@%p", o);
2449 int mdt_object_exists(const struct lu_context *ctx,
2450 const struct lu_object *o)
2452 return lu_object_exists(ctx, lu_object_next(o));
2455 static struct lu_device_operations mdt_lu_ops = {
2456 .ldo_object_alloc = mdt_object_alloc,
2457 .ldo_process_config = mdt_process_config
2460 static struct lu_object_operations mdt_obj_ops = {
2461 .loo_object_init = mdt_object_init,
2462 .loo_object_free = mdt_object_free,
2463 .loo_object_print = mdt_object_print,
2464 .loo_object_exists = mdt_object_exists
2467 /* mds_connect_internal */
2468 static int mdt_connect0(struct mdt_device *mdt,
2469 struct obd_export *exp, struct obd_connect_data *data)
2472 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
2473 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
2475 /* If no known bits (which should not happen, probably,
2476 as everybody should support LOOKUP and UPDATE bits at least)
2477 revert to compat mode with plain locks. */
2478 if (!data->ocd_ibits_known &&
2479 data->ocd_connect_flags & OBD_CONNECT_IBITS)
2480 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
2482 if (!mdt->mdt_opts.mo_acl)
2483 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
2485 if (!mdt->mdt_opts.mo_user_xattr)
2486 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
2488 exp->exp_connect_flags = data->ocd_connect_flags;
2489 data->ocd_version = LUSTRE_VERSION_CODE;
2490 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
2493 if (mdt->mdt_opts.mo_acl &&
2494 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
2495 CWARN("%s: MDS requires ACL support but client does not\n",
2496 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
2502 /* mds_connect copy */
2503 static int mdt_obd_connect(const struct lu_context *ctx,
2504 struct lustre_handle *conn, struct obd_device *obd,
2505 struct obd_uuid *cluuid,
2506 struct obd_connect_data *data)
2508 struct obd_export *exp;
2509 struct mdt_device *mdt;
2510 struct mdt_export_data *med;
2511 struct mdt_client_data *mcd;
2515 LASSERT(ctx != NULL);
2516 if (!conn || !obd || !cluuid)
2519 mdt = mdt_dev(obd->obd_lu_dev);
2521 rc = class_connect(conn, obd, cluuid);
2525 exp = class_conn2export(conn);
2526 LASSERT(exp != NULL);
2527 med = &exp->exp_mdt_data;
2529 rc = mdt_connect0(mdt, exp, data);
2533 memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
2536 * rc = mdt_client_add(ctx, mdt, med, -1);
2544 class_disconnect(exp);
2546 class_export_put(exp);
2551 static int mdt_obd_disconnect(struct obd_export *exp)
2553 unsigned long irqflags;
2558 class_export_get(exp);
2560 /* Disconnect early so that clients can't keep using export */
2561 rc = class_disconnect(exp);
2562 //ldlm_cancel_locks_for_export(exp);
2564 /* complete all outstanding replies */
2565 spin_lock_irqsave(&exp->exp_lock, irqflags);
2566 while (!list_empty(&exp->exp_outstanding_replies)) {
2567 struct ptlrpc_reply_state *rs =
2568 list_entry(exp->exp_outstanding_replies.next,
2569 struct ptlrpc_reply_state, rs_exp_list);
2570 struct ptlrpc_service *svc = rs->rs_service;
2572 spin_lock(&svc->srv_lock);
2573 list_del_init(&rs->rs_exp_list);
2574 ptlrpc_schedule_difficult_reply(rs);
2575 spin_unlock(&svc->srv_lock);
2577 spin_unlock_irqrestore(&exp->exp_lock, irqflags);
2579 class_export_put(exp);
2583 /* FIXME: Can we avoid using these two interfaces? */
2584 static int mdt_init_export(struct obd_export *exp)
2586 struct mdt_export_data *med = &exp->exp_mdt_data;
2589 INIT_LIST_HEAD(&med->med_open_head);
2590 spin_lock_init(&med->med_open_lock);
2591 exp->exp_connecting = 1;
2595 static int mdt_destroy_export(struct obd_export *export)
2597 struct mdt_export_data *med;
2598 struct obd_device *obd = export->exp_obd;
2599 struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
2600 struct lu_context ctxt;
2604 med = &export->exp_mdt_data;
2605 target_destroy_export(export);
2607 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
2610 rc = lu_context_init(&ctxt, LCT_MD_THREAD);
2614 lu_context_enter(&ctxt);
2615 /* Close any open files (which may also cause orphan unlinking). */
2616 spin_lock(&med->med_open_lock);
2617 while (!list_empty(&med->med_open_head)) {
2618 struct list_head *tmp = med->med_open_head.next;
2619 struct mdt_file_data *mfd =
2620 list_entry(tmp, struct mdt_file_data, mfd_list);
2622 /* Remove mfd handle so it can't be found again.
2623 * We are consuming the mfd_list reference here. */
2624 class_handle_unhash(&mfd->mfd_handle);
2625 list_del_init(&mfd->mfd_list);
2626 spin_unlock(&med->med_open_lock);
2627 mdt_mfd_close(&ctxt, mdt, mfd);
2628 spin_lock(&med->med_open_lock);
2630 spin_unlock(&med->med_open_lock);
2631 mdt_client_free(&ctxt, mdt, med);
2633 lu_context_exit(&ctxt);
2634 lu_context_fini(&ctxt);
2639 static int mdt_upcall(const struct lu_context *ctx, struct md_device *md,
2640 enum md_upcall_event ev)
2642 struct mdt_device *m = mdt_dev(&md->md_lu_dev);
2643 struct md_device *next = m->mdt_child;
2649 rc = next->md_ops->mdo_get_maxsize(ctx, next,
2650 &m->mdt_max_mdsize, &m->mdt_max_cookiesize);
2651 CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d \n",
2652 m->mdt_max_mdsize, m->mdt_max_cookiesize);
2655 CERROR("invalid event\n");
2663 static struct obd_ops mdt_obd_device_ops = {
2664 .o_owner = THIS_MODULE,
2665 .o_connect = mdt_obd_connect,
2666 .o_disconnect = mdt_obd_disconnect,
2667 .o_init_export = mdt_init_export, /* By Huang Hua*/
2668 .o_destroy_export = mdt_destroy_export, /* By Huang Hua*/
2671 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
2673 struct mdt_device *m = mdt_dev(d);
2679 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
2680 struct lu_device_type *t,
2681 struct lustre_cfg *cfg)
2683 struct lu_device *l;
2684 struct mdt_device *m;
2690 l = &m->mdt_md_dev.md_lu_dev;
2691 result = mdt_init0(ctx, m, t, cfg);
2694 l = ERR_PTR(result);
2696 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
2698 l = ERR_PTR(-ENOMEM);
2703 * context key constructor/destructor
2706 static void *mdt_thread_init(const struct lu_context *ctx,
2707 struct lu_context_key *key)
2709 struct mdt_thread_info *info;
2712 * check that no high order allocations are incurred.
2714 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
2715 OBD_ALLOC_PTR(info);
2717 info = ERR_PTR(-ENOMEM);
2721 static void mdt_thread_fini(const struct lu_context *ctx,
2722 struct lu_context_key *key, void *data)
2724 struct mdt_thread_info *info = data;
2728 struct lu_context_key mdt_thread_key = {
2729 .lct_tags = LCT_MD_THREAD,
2730 .lct_init = mdt_thread_init,
2731 .lct_fini = mdt_thread_fini
2734 static void *mdt_txn_init(const struct lu_context *ctx,
2735 struct lu_context_key *key)
2737 struct mdt_txn_info *txi;
2740 * check that no high order allocations are incurred.
2742 CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
2745 txi = ERR_PTR(-ENOMEM);
2749 static void mdt_txn_fini(const struct lu_context *ctx,
2750 struct lu_context_key *key, void *data)
2752 struct mdt_txn_info *txi = data;
2756 struct lu_context_key mdt_txn_key = {
2757 .lct_tags = LCT_TX_HANDLE,
2758 .lct_init = mdt_txn_init,
2759 .lct_fini = mdt_txn_fini
2763 static int mdt_type_init(struct lu_device_type *t)
2767 rc = lu_context_key_register(&mdt_thread_key);
2769 rc = lu_context_key_register(&mdt_txn_key);
2773 static void mdt_type_fini(struct lu_device_type *t)
2775 lu_context_key_degister(&mdt_thread_key);
2776 lu_context_key_degister(&mdt_txn_key);
2779 static struct lu_device_type_operations mdt_device_type_ops = {
2780 .ldto_init = mdt_type_init,
2781 .ldto_fini = mdt_type_fini,
2783 .ldto_device_alloc = mdt_device_alloc,
2784 .ldto_device_free = mdt_device_free
2787 static struct lu_device_type mdt_device_type = {
2788 .ldt_tags = LU_DEVICE_MD,
2789 .ldt_name = LUSTRE_MDT0_NAME,
2790 .ldt_ops = &mdt_device_type_ops,
2791 .ldt_ctx_tags = LCT_MD_THREAD
2794 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
2798 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
2802 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
2804 static int __init mdt_mod_init(void)
2807 struct lprocfs_static_vars lvars;
2809 mdt_num_threads = MDT_NUM_THREADS;
2810 lprocfs_init_vars(mdt, &lvars);
2811 result = class_register_type(&mdt_obd_device_ops, NULL,
2812 lvars.module_vars, LUSTRE_MDT0_NAME,
2817 static void __exit mdt_mod_exit(void)
2819 class_unregister_type(LUSTRE_MDT0_NAME);
2823 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt) \
2824 [prefix ## _ ## opc - prefix ## _ ## base] = { \
2826 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
2827 .mh_opc = prefix ## _ ## opc, \
2828 .mh_flags = flags, \
2833 #define DEF_MDT_HNDL(flags, name, fn, fmt) \
2834 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
2836 * Request with a format known in advance
2838 #define DEF_MDT_HNDL_F(flags, name, fn) \
2839 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
2841 * Request with a format we do not yet know
2843 #define DEF_MDT_HNDL_0(flags, name, fn) \
2844 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
2846 static struct mdt_handler mdt_mds_ops[] = {
2847 DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
2848 DEF_MDT_HNDL_F(0, DISCONNECT, mdt_disconnect),
2849 DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus),
2850 DEF_MDT_HNDL_F(HABEO_CORPUS , GETATTR, mdt_getattr),
2851 DEF_MDT_HNDL_F(HABEO_CORPUS , GETATTR_NAME, mdt_getattr_name),
2852 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO|MUTABOR,
2853 SETXATTR, mdt_setxattr),
2854 DEF_MDT_HNDL_F(HABEO_CORPUS, GETXATTR, mdt_getxattr),
2855 DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs),
2856 DEF_MDT_HNDL_F(0 |MUTABOR,
2858 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, CLOSE, mdt_close),
2859 DEF_MDT_HNDL_0(0, DONE_WRITING, mdt_done_writing),
2860 DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin),
2861 DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
2862 DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_handle_quotacheck),
2863 DEF_MDT_HNDL_0(0, QUOTACTL, mdt_handle_quotactl)
2866 #define DEF_OBD_HNDL(flags, name, fn) \
2867 DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
2870 static struct mdt_handler mdt_obd_ops[] = {
2871 DEF_OBD_HNDL(0, PING, mdt_obd_ping),
2872 DEF_OBD_HNDL(0, LOG_CANCEL, mdt_obd_log_cancel),
2873 DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback)
2876 #define DEF_DLM_HNDL_0(flags, name, fn) \
2877 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
2878 #define DEF_DLM_HNDL_F(flags, name, fn) \
2879 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
2881 static struct mdt_handler mdt_dlm_ops[] = {
2882 DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
2883 DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT, mdt_convert),
2884 DEF_DLM_HNDL_0(0, BL_CALLBACK, mdt_bl_callback),
2885 DEF_DLM_HNDL_0(0, CP_CALLBACK, mdt_cp_callback)
2888 static struct mdt_handler mdt_llog_ops[] = {
2891 static struct mdt_opc_slice mdt_handlers[] = {
2893 .mos_opc_start = MDS_GETATTR,
2894 .mos_opc_end = MDS_LAST_OPC,
2895 .mos_hs = mdt_mds_ops
2898 .mos_opc_start = OBD_PING,
2899 .mos_opc_end = OBD_LAST_OPC,
2900 .mos_hs = mdt_obd_ops
2903 .mos_opc_start = LDLM_ENQUEUE,
2904 .mos_opc_end = LDLM_LAST_OPC,
2905 .mos_hs = mdt_dlm_ops
2908 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
2909 .mos_opc_end = LLOG_LAST_OPC,
2910 .mos_hs = mdt_llog_ops
2917 static struct mdt_handler mdt_mds_readpage_ops[] = {
2918 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
2921 static struct mdt_opc_slice mdt_readpage_handlers[] = {
2923 .mos_opc_start = MDS_GETATTR,
2924 .mos_opc_end = MDS_LAST_OPC,
2925 .mos_hs = mdt_mds_readpage_ops
2932 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2933 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT0_NAME")");
2934 MODULE_LICENSE("GPL");
2936 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
2937 "number of mdt service threads to start");
2939 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);