1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/mdt/mdt_handler.c
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
13 * Author: Huang Hua <huanghua@clusterfs.com>
15 * This file is part of the Lustre file system, http://www.lustre.org
16 * Lustre is a trademark of Cluster File Systems, Inc.
18 * You may have signed or agreed to another license before downloading
19 * this software. If so, you are bound by the terms and conditions
20 * of that agreement, and the following does not apply to you. See the
21 * LICENSE file included with this distribution for more information.
23 * If you did not agree to a different license, then this copy of Lustre
24 * is open source software; you can redistribute it and/or modify it
25 * under the terms of version 2 of the GNU General Public License as
26 * published by the Free Software Foundation.
28 * In either case, Lustre is distributed in the hope that it will be
29 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * license text for more details.
35 # define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_MDS
39 #include <linux/module.h>
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
44 * struct OBD_{ALLOC,FREE}*()
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include <lustre_mdt.h>
58 #include "mdt_internal.h"
59 #include <linux/lustre_acl.h>
61 * Initialized in mdt_mod_init().
63 unsigned long mdt_num_threads;
65 /* ptlrpc request handler for MDT. All handlers are
66 * grouped into several slices - struct mdt_opc_slice,
67 * and stored in an array - mdt_handlers[].
70 /* The name of this handler. */
72 /* Fail id for this handler, checked at the beginning of this handler*/
74 /* Operation code for this handler */
76 /* flags are listed in enum mdt_handler_flags below. */
78 /* The actual handler function to execute. */
79 int (*mh_act)(struct mdt_thread_info *info);
80 /* Request format for this request. */
81 const struct req_format *mh_fmt;
84 enum mdt_handler_flags {
86 * struct mdt_body is passed in the incoming message, and object
87 * identified by this fid exists on disk.
89 * "habeo corpus" == "I have a body"
91 HABEO_CORPUS = (1 << 0),
93 * struct ldlm_request is passed in the incoming message.
95 * "habeo clavis" == "I have a key"
97 HABEO_CLAVIS = (1 << 1),
99 * this request has fixed reply format, so that reply message can be
100 * packed by generic code.
102 * "habeo refero" == "I have a reply"
104 HABEO_REFERO = (1 << 2),
106 * this request will modify something, so check whether the filesystem
107 * is readonly or not, then return -EROFS to client asap if necessary.
109 * "mutabor" == "I shall modify"
114 struct mdt_opc_slice {
117 struct mdt_handler *mos_hs;
120 static struct mdt_opc_slice mdt_regular_handlers[];
121 static struct mdt_opc_slice mdt_readpage_handlers[];
122 static struct mdt_opc_slice mdt_seq_handlers[];
123 static struct mdt_opc_slice mdt_fld_handlers[];
125 static struct mdt_device *mdt_dev(struct lu_device *d);
126 static int mdt_regular_handle(struct ptlrpc_request *req);
127 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
129 static struct lu_object_operations mdt_obj_ops;
131 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
135 return (rep->lock_policy_res1 & flag);
138 void mdt_set_disposition(struct mdt_thread_info *info,
139 struct ldlm_reply *rep, int flag)
142 info->mti_opdata |= flag;
144 rep->lock_policy_res1 |= flag;
148 static int mdt_getstatus(struct mdt_thread_info *info)
150 struct md_device *next = info->mti_mdt->mdt_child;
152 struct mdt_body *body;
156 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
159 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
160 rc = next->md_ops->mdo_root_get(info->mti_ctxt,
163 body->valid |= OBD_MD_FLID;
169 static int mdt_statfs(struct mdt_thread_info *info)
171 struct md_device *next = info->mti_mdt->mdt_child;
172 struct obd_statfs *osfs;
177 /* This will trigger a watchdog timeout */
178 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
179 (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
182 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
185 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
186 /* XXX max_age optimisation is needed here. See mds_statfs */
187 rc = next->md_ops->mdo_statfs(info->mti_ctxt,
188 next, &info->mti_u.ksfs);
189 statfs_pack(osfs, &info->mti_u.ksfs);
195 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
196 const struct lu_fid *fid)
198 /*XXX should pack the reply body according to lu_valid*/
199 b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
200 OBD_MD_FLGID | OBD_MD_FLTYPE |
201 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
202 OBD_MD_FLATIME | OBD_MD_FLMTIME ;
204 if (!S_ISREG(attr->la_mode))
205 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
207 b->atime = attr->la_atime;
208 b->mtime = attr->la_mtime;
209 b->ctime = attr->la_ctime;
210 b->mode = attr->la_mode;
211 b->size = attr->la_size;
212 b->blocks = attr->la_blocks;
213 b->uid = attr->la_uid;
214 b->gid = attr->la_gid;
215 b->flags = attr->la_flags;
216 b->nlink = attr->la_nlink;
217 b->rdev = attr->la_rdev;
221 b->valid |= OBD_MD_FLID;
222 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
223 PFID(fid), b->nlink, b->mode, b->size);
227 static inline int mdt_body_has_lov(const struct lu_attr *la,
228 const struct mdt_body *body)
230 return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
231 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
234 static int mdt_getattr_internal(struct mdt_thread_info *info,
235 struct mdt_object *o)
237 struct md_object *next = mdt_object_child(o);
238 const struct mdt_body *reqbody = info->mti_body;
239 struct ptlrpc_request *req = mdt_info_req(info);
240 struct md_attr *ma = &info->mti_attr;
241 struct lu_attr *la = &ma->ma_attr;
242 struct req_capsule *pill = &info->mti_pill;
243 const struct lu_context *ctxt = info->mti_ctxt;
244 struct mdt_body *repbody;
250 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
253 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
254 repbody->eadatasize = 0;
255 repbody->aclsize = 0;
257 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
258 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
260 ma->ma_need = MA_INODE | MA_LOV;
261 rc = mo_attr_get(ctxt, next, ma);
262 if (rc == -EREMOTE) {
263 /* This object is located on remote node.*/
264 repbody->fid1 = *mdt_object_fid(o);
265 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
268 CERROR("getattr error for "DFID": %d\n",
269 PFID(mdt_object_fid(o)), rc);
273 if (ma->ma_valid & MA_INODE)
274 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
278 if (mdt_body_has_lov(la, reqbody)) {
279 if (ma->ma_valid & MA_LOV) {
280 LASSERT(ma->ma_lmm_size);
281 mdt_dump_lmm(D_INFO, ma->ma_lmm);
282 repbody->eadatasize = ma->ma_lmm_size;
283 if (S_ISDIR(la->la_mode))
284 repbody->valid |= OBD_MD_FLDIREA;
286 repbody->valid |= OBD_MD_FLEASIZE;
288 } else if (S_ISLNK(la->la_mode) &&
289 reqbody->valid & OBD_MD_LINKNAME) {
290 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
292 CERROR("readlink failed: %d\n", rc);
295 repbody->valid |= OBD_MD_LINKNAME;
296 repbody->eadatasize = rc + 1;
297 ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
298 CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
299 (char*)ma->ma_lmm, rc);
304 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
305 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
306 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
307 repbody->valid |= OBD_MD_FLMODEASIZE;
308 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
309 "MAX_COOKIE to : %d:%d\n",
311 repbody->max_cookiesize);
314 #ifdef CONFIG_FS_POSIX_ACL
315 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
316 (reqbody->valid & OBD_MD_FLACL)) {
317 buffer = req_capsule_server_get(pill, &RMF_ACL);
318 length = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
320 rc = mo_xattr_get(ctxt, next, buffer,
321 length, XATTR_NAME_ACL_ACCESS);
323 if (rc == -ENODATA || rc == -EOPNOTSUPP)
326 CERROR("got acl size: %d\n", rc);
328 repbody->aclsize = rc;
329 repbody->valid |= OBD_MD_FLACL;
338 static int mdt_getattr(struct mdt_thread_info *info)
341 struct mdt_object *obj;
343 obj = info->mti_object;
344 LASSERT(obj != NULL);
345 LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
348 rc = mdt_getattr_internal(info, obj);
349 mdt_shrink_reply(info, REPLY_REC_OFF + 1);
354 * UPDATE lock should be taken against parent, and be release before exit;
355 * child_bits lock should be taken against child, and be returned back:
356 * (1)normal request should release the child lock;
357 * (2)intent request will grant the lock to client.
359 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
360 struct mdt_lock_handle *lhc,
362 struct ldlm_reply *ldlm_rep)
364 struct mdt_object *parent = info->mti_object;
365 struct mdt_object *child;
366 struct md_object *next = mdt_object_child(info->mti_object);
367 struct lu_fid *child_fid = &info->mti_tmp_fid1;
370 struct mdt_lock_handle *lhp;
373 LASSERT(info->mti_object != NULL);
374 name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
378 CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
379 PFID(mdt_object_fid(parent)), name, ldlm_rep);
381 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
382 if (strlen(name) == 0) {
383 /* only getattr on the child. parent is on another node. */
384 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
386 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
388 PFID(mdt_object_fid(child)), ldlm_rep);
390 mdt_lock_handle_init(lhc);
391 lhc->mlh_mode = LCK_CR;
392 rc = mdt_object_lock(info, child, lhc, child_bits);
394 /* finally, we can get attr for child. */
395 rc = mdt_getattr_internal(info, child);
397 mdt_object_unlock(info, child, lhc, 1);
402 /*step 1: lock parent */
403 lhp = &info->mti_lh[MDT_LH_PARENT];
404 lhp->mlh_mode = LCK_CR;
405 rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
409 /*step 2: lookup child's fid by name */
410 rc = mdo_lookup(info->mti_ctxt, next, name, child_fid);
413 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
414 GOTO(out_parent, rc);
416 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
418 *step 3: find the child object by fid & lock it.
419 * regardless if it is local or remote.
421 mdt_lock_handle_init(lhc);
422 lhc->mlh_mode = LCK_CR;
423 child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
425 GOTO(out_parent, rc = PTR_ERR(child));
427 /* finally, we can get attr for child. */
428 rc = mdt_getattr_internal(info, child);
430 mdt_object_unlock(info, child, lhc, 1);
432 /* This is pure debugging code. */
433 struct ldlm_lock *lock;
434 struct ldlm_res_id *res_id;
435 lock = ldlm_handle2lock(&lhc->mlh_lh);
437 res_id = &lock->l_resource->lr_name;
438 LDLM_DEBUG(lock, "we will return this lock client\n");
439 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
440 &lock->l_resource->lr_name),
441 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
442 (unsigned long)res_id->name[0],
443 (unsigned long)res_id->name[1],
444 (unsigned long)res_id->name[2],
445 PFID(mdt_object_fid(child)));
449 mdt_object_put(info->mti_ctxt, child);
453 mdt_object_unlock(info, parent, lhp, 1);
458 /* normal handler: should release the child lock */
459 static int mdt_getattr_name(struct mdt_thread_info *info)
461 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
466 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
467 if (lustre_handle_is_used(&lhc->mlh_lh)) {
468 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
469 lhc->mlh_lh.cookie = 0;
471 mdt_shrink_reply(info, REPLY_REC_OFF + 1);
475 static struct lu_device_operations mdt_lu_ops;
477 static int lu_device_is_mdt(struct lu_device *d)
479 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
482 static inline struct mdt_device *mdt_dev(struct lu_device *d)
484 LASSERT(lu_device_is_mdt(d));
485 return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
488 static int mdt_connect(struct mdt_thread_info *info)
491 struct ptlrpc_request *req;
493 req = mdt_info_req(info);
494 rc = target_handle_connect(req, mdt_regular_handle);
496 LASSERT(req->rq_export != NULL);
497 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
502 static int mdt_disconnect(struct mdt_thread_info *info)
504 return target_handle_disconnect(mdt_info_req(info));
507 static int mdt_sendpage(struct mdt_thread_info *info,
508 struct lu_rdpg *rdpg)
510 struct ptlrpc_request *req = mdt_info_req(info);
511 struct ptlrpc_bulk_desc *desc;
512 struct l_wait_info *lwi = &info->mti_u.rdpg.mti_wait_info;
519 desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
522 GOTO(out, rc = -ENOMEM);
524 for (i = 0, tmpcount = rdpg->rp_count;
525 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
526 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
527 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
530 LASSERT(desc->bd_nob == rdpg->rp_count);
531 rc = ptlrpc_start_bulk_transfer(desc);
535 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
536 GOTO(abort_bulk, rc);
538 *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
539 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
540 LASSERT (rc == 0 || rc == -ETIMEDOUT);
543 if (desc->bd_success &&
544 desc->bd_nob_transferred == rdpg->rp_count)
547 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
550 DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
551 (rc == -ETIMEDOUT) ? "timeout" : "network error",
552 desc->bd_nob_transferred, rdpg->rp_count,
553 req->rq_export->exp_client_uuid.uuid,
554 req->rq_export->exp_connection->c_remote_uuid.uuid);
556 class_fail_export(req->rq_export);
560 ptlrpc_abort_bulk(desc);
562 ptlrpc_free_bulk(desc);
567 #ifdef HAVE_SPLIT_SUPPORT
569 * Retrieve dir entry from the page and insert it to the
570 * slave object, actually, this should be in osd layer,
571 * but since it will not in the final product, so just do
572 * it here and do not define more moo api anymore for
575 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page)
577 struct mdt_object *object = info->mti_object;
578 struct lu_dirpage *dp;
579 struct lu_dirent *ent;
583 dp = page_address(page);
584 for (ent = lu_dirent_start(dp); ent != NULL;
585 ent = lu_dirent_next(ent)) {
586 struct lu_fid *lf = &ent->lde_fid;
588 /* FIXME: check isdir */
589 rc = mdo_name_insert(info->mti_ctxt,
590 md_object_next(&object->mot_obj),
591 ent->lde_name, lf, 0);
592 /* FIXME: add cross_flags */
602 static int mdt_bulk_timeout(void *data)
605 /* We don't fail the connection here, because having the export
606 * killed makes the (vital) call to commitrw very sad.
611 static int mdt_writepage(struct mdt_thread_info *info)
613 struct ptlrpc_request *req = mdt_info_req(info);
614 struct l_wait_info *lwi;
615 struct ptlrpc_bulk_desc *desc;
620 desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
624 /* allocate the page for the desc */
625 page = alloc_pages(GFP_KERNEL, 0);
627 GOTO(desc_cleanup, rc = -ENOMEM);
629 ptlrpc_prep_bulk_page(desc, page, 0, CFS_PAGE_SIZE);
631 /* FIXME: following parts are copied from ost_brw_write */
633 /* Check if client was evicted while we were doing i/o before touching
637 GOTO(cleanup_page, rc = -ENOMEM);
639 if (desc->bd_export->exp_failed)
642 rc = ptlrpc_start_bulk_transfer (desc);
644 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
645 mdt_bulk_timeout, desc);
646 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
647 desc->bd_export->exp_failed, lwi);
648 LASSERT(rc == 0 || rc == -ETIMEDOUT);
649 if (rc == -ETIMEDOUT) {
650 DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
651 ptlrpc_abort_bulk(desc);
652 } else if (desc->bd_export->exp_failed) {
653 DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
655 ptlrpc_abort_bulk(desc);
656 } else if (!desc->bd_success ||
657 desc->bd_nob_transferred != desc->bd_nob) {
658 DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
660 "truncated" : "network error on",
661 desc->bd_nob_transferred, desc->bd_nob);
662 /* XXX should this be a different errno? */
666 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
669 GOTO(cleanup_lwi, rc);
670 rc = mdt_write_dir_page(info, page);
675 __free_pages(page, 0);
677 ptlrpc_free_bulk(desc);
682 static int mdt_readpage(struct mdt_thread_info *info)
684 struct mdt_object *object = info->mti_object;
685 struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
686 struct mdt_body *reqbody;
687 struct mdt_body *repbody;
692 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
695 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
696 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
697 if (reqbody == NULL || repbody == NULL)
701 * prepare @rdpg before calling lower layers and transfer itself. Here
702 * reqbody->size contains offset of where to start to read and
703 * reqbody->nlink contains number bytes to read.
705 rdpg->rp_hash = reqbody->size;
706 if ((__u64)rdpg->rp_hash != reqbody->size) {
707 CERROR("Invalid hash: %#llx != %#llx\n",
708 (__u64)rdpg->rp_hash, reqbody->size);
711 rdpg->rp_count = reqbody->nlink;
712 rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
713 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
714 if (rdpg->rp_pages == NULL)
717 for (i = 0; i < rdpg->rp_npages; ++i) {
718 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
719 if (rdpg->rp_pages[i] == NULL)
720 GOTO(free_rdpg, rc = -ENOMEM);
723 /* call lower layers to fill allocated pages with directory data */
724 rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
728 /* send pages to client */
729 rc = mdt_sendpage(info, rdpg);
733 for (i = 0; i < rdpg->rp_npages; i++)
734 if (rdpg->rp_pages[i] != NULL)
735 __free_pages(rdpg->rp_pages[i], 0);
736 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
738 MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
743 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
745 struct req_capsule *pill = &info->mti_pill;
746 struct mdt_device *mdt = info->mti_mdt;
747 struct ptlrpc_request *req = mdt_info_req(info);
751 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
754 rc = mdt_reint_unpack(info, op);
759 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
760 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
761 mdt->mdt_max_mdsize);
762 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
763 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
764 mdt->mdt_max_cookiesize);
765 rc = req_capsule_pack(pill);
769 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
770 struct mdt_client_data *mcd;
772 mcd = req->rq_export->exp_mdt_data.med_mcd;
773 if (mcd->mcd_last_xid == req->rq_xid) {
774 mdt_reconstruct(info);
775 RETURN(lustre_msg_get_status(req->rq_repmsg));
777 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
780 rc = mdt_reint_rec(info);
785 static long mdt_reint_opcode(struct mdt_thread_info *info,
786 const struct req_format **fmt)
792 ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
795 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
796 if (opc < REINT_MAX && fmt[opc] != NULL)
797 req_capsule_extend(&info->mti_pill, fmt[opc]);
799 CERROR("Unsupported opc: %ld\n", opc);
804 static int mdt_reint(struct mdt_thread_info *info)
809 static const struct req_format *reint_fmts[REINT_MAX] = {
810 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
811 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
812 [REINT_LINK] = &RQF_MDS_REINT_LINK,
813 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
814 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
815 [REINT_OPEN] = &RQF_MDS_REINT_OPEN
820 opc = mdt_reint_opcode(info, reint_fmts);
822 rc = mdt_reint_internal(info, opc);
826 info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
830 /* TODO these two methods not available now. */
832 /* this should sync the whole device */
833 static int mdt_device_sync(struct mdt_thread_info *info)
838 /* this should sync this object */
839 static int mdt_object_sync(struct mdt_thread_info *info)
844 static int mdt_sync(struct mdt_thread_info *info)
846 struct req_capsule *pill = &info->mti_pill;
847 struct mdt_body *body;
851 /* The fid may be zero, so we req_capsule_set manually */
852 req_capsule_set(pill, &RQF_MDS_SYNC);
854 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
858 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
861 if (fid_seq(&body->fid1) == 0) {
862 /* sync the whole device */
863 rc = req_capsule_pack(pill);
865 rc = mdt_device_sync(info);
868 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
870 rc = mdt_object_sync(info);
872 struct md_object *next;
873 const struct lu_fid *fid;
876 next = mdt_object_child(info->mti_object);
877 fid = mdt_object_fid(info->mti_object);
878 info->mti_attr.ma_need = MA_INODE;
879 rc = mo_attr_get(info->mti_ctxt, next,
881 la = &info->mti_attr.ma_attr;
883 body = req_capsule_server_get(pill,
885 mdt_pack_attr2body(body, la, fid);
893 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
898 static int mdt_quotactl_handle(struct mdt_thread_info *info)
904 * OBD PING and other handlers.
906 static int mdt_obd_ping(struct mdt_thread_info *info)
910 rc = target_handle_ping(mdt_info_req(info));
914 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
919 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
929 static struct ldlm_callback_suite cbs = {
930 .lcs_completion = ldlm_server_completion_ast,
931 .lcs_blocking = ldlm_server_blocking_ast,
935 static int mdt_enqueue(struct mdt_thread_info *info)
938 struct ptlrpc_request *req;
941 * info->mti_dlm_req already contains swapped and (if necessary)
942 * converted dlm request.
944 LASSERT(info->mti_dlm_req != NULL);
946 req = mdt_info_req(info);
947 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
948 rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
949 req, info->mti_dlm_req, &cbs);
950 return rc ? : req->rq_status;
953 static int mdt_convert(struct mdt_thread_info *info)
956 struct ptlrpc_request *req;
958 LASSERT(info->mti_dlm_req);
959 req = mdt_info_req(info);
960 rc = ldlm_handle_convert0(req, info->mti_dlm_req);
961 return rc ? : req->rq_status;
964 static int mdt_bl_callback(struct mdt_thread_info *info)
966 CERROR("bl callbacks should not happen on MDS\n");
971 static int mdt_cp_callback(struct mdt_thread_info *info)
973 CERROR("cp callbacks should not happen on MDS\n");
979 * Build (DLM) resource name from fid.
981 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
982 struct ldlm_res_id *name)
984 memset(name, 0, sizeof *name);
985 name->name[0] = fid_seq(f);
986 name->name[1] = fid_oid(f);
987 name->name[2] = fid_ver(f);
991 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
992 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
993 struct lustre_handle *lh, ldlm_mode_t mode,
994 ldlm_policy_data_t *policy,
995 struct ldlm_res_id *res_id)
997 int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/
1000 LASSERT(ns != NULL);
1001 LASSERT(lh != NULL);
1004 rc = ldlm_cli_enqueue_local(ns, *fid_build_res_name(f, res_id),
1005 LDLM_IBITS, policy, mode, &flags,
1006 ldlm_blocking_ast, ldlm_completion_ast,
1007 NULL, NULL, 0, NULL, lh);
1008 return rc == ELDLM_OK ? 0 : -EIO;
1011 /* just call ldlm_lock_decref() if decref,
1012 * else we only call ptlrpc_save_lock() to save this lock in req.
1013 * when transaction committed, req will be released, and lock will, too */
1014 void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f,
1015 struct lustre_handle *lh, ldlm_mode_t mode, int decref)
1018 /* FIXME: this is debug stuff, remove it later. */
1019 struct ldlm_lock *lock = ldlm_handle2lock(lh);
1021 CERROR("invalid lock handle "LPX64, lh->cookie);
1024 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
1025 LDLM_LOCK_PUT(lock);
1028 ldlm_lock_decref(lh, mode);
1030 ptlrpc_save_lock(req, lh, mode);
1033 static struct mdt_object *mdt_obj(struct lu_object *o)
1035 LASSERT(lu_device_is_mdt(o->lo_dev));
1036 return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1039 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
1040 struct mdt_device *d,
1041 const struct lu_fid *f)
1043 struct lu_object *o;
1044 struct mdt_object *m;
1047 o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
1049 m = (struct mdt_object *)o;
1055 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1056 struct mdt_lock_handle *lh, __u64 ibits)
1058 ldlm_policy_data_t *policy = &info->mti_policy;
1059 struct ldlm_res_id *res_id = &info->mti_res_id;
1060 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1064 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1065 LASSERT(lh->mlh_mode != LCK_MINMODE);
1067 policy->l_inodebits.bits = ibits;
1069 rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
1074 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1075 struct mdt_lock_handle *lh, int decref)
1077 struct ptlrpc_request *req = mdt_info_req(info);
1080 if (lustre_handle_is_used(&lh->mlh_lh)) {
1081 fid_unlock(req, mdt_object_fid(o),
1082 &lh->mlh_lh, lh->mlh_mode, decref);
1083 lh->mlh_lh.cookie = 0;
1088 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1089 const struct lu_fid *f,
1090 struct mdt_lock_handle *lh,
1093 struct mdt_object *o;
1095 o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
1099 rc = mdt_object_lock(info, o, lh, ibits);
1101 mdt_object_put(info->mti_ctxt, o);
1108 void mdt_object_unlock_put(struct mdt_thread_info * info,
1109 struct mdt_object * o,
1110 struct mdt_lock_handle *lh,
1113 mdt_object_unlock(info, o, lh, decref);
1114 mdt_object_put(info->mti_ctxt, o);
1117 static struct mdt_handler *mdt_handler_find(__u32 opc,
1118 struct mdt_opc_slice *supported)
1120 struct mdt_opc_slice *s;
1121 struct mdt_handler *h;
1124 for (s = supported; s->mos_hs != NULL; s++) {
1125 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1126 h = s->mos_hs + (opc - s->mos_opc_start);
1128 LASSERT(h->mh_opc == opc);
1130 h = NULL; /* unsupported opc */
1137 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1139 return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1142 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1144 return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1147 static int mdt_lock_resname_compat(struct mdt_device *m,
1148 struct ldlm_request *req)
1150 /* XXX something... later. */
1154 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1156 /* XXX something... later. */
1161 * Generic code handling requests that have struct mdt_body passed in:
1163 * - extract mdt_body from request and save it in @info, if present;
1165 * - create lu_object, corresponding to the fid in mdt_body, and save it in
1168 * - if HABEO_CORPUS flag is set for this request type check whether object
1169 * actually exists on storage (lu_object_exists()).
1172 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1174 const struct mdt_body *body;
1175 struct mdt_object *obj;
1176 const struct lu_context *ctx;
1177 struct req_capsule *pill;
1180 ctx = info->mti_ctxt;
1181 pill = &info->mti_pill;
1183 body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1185 if (fid_is_sane(&body->fid1)) {
1186 obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
1188 if ((flags & HABEO_CORPUS) &&
1189 !lu_object_exists(&obj->mot_obj.mo_lu)) {
1190 mdt_object_put(ctx, obj);
1193 info->mti_object = obj;
1199 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1207 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1209 struct req_capsule *pill;
1213 pill = &info->mti_pill;
1215 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1216 rc = mdt_body_unpack(info, flags);
1220 if (rc == 0 && (flags & HABEO_REFERO)) {
1221 struct mdt_device *mdt = info->mti_mdt;
1223 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1224 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1225 mdt->mdt_max_mdsize);
1226 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1227 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1228 mdt->mdt_max_cookiesize);
1230 rc = req_capsule_pack(pill);
1235 struct lu_context_key mdt_txn_key;
1237 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1239 struct mdt_device *mdt = info->mti_mdt;
1240 struct ptlrpc_request *req = mdt_info_req(info);
1241 struct obd_export *exp = req->rq_export;
1243 /* sometimes the reply message has not been successfully packed */
1244 if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1247 if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1250 /*XXX: assert on this when all code will be finished */
1251 if (rc != 0 && info->mti_transno != 0) {
1252 info->mti_transno = 0;
1253 CERROR("Transno is not 0 while rc is %i!\n", rc);
1256 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1257 info->mti_transno, exp->exp_obd->obd_last_committed);
1259 spin_lock(&mdt->mdt_transno_lock);
1260 req->rq_transno = info->mti_transno;
1261 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1263 target_committed_to_req(req);
1265 spin_unlock(&mdt->mdt_transno_lock);
1266 lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1267 //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1271 * Invoke handler for this request opc. Also do necessary preprocessing
1272 * (according to handler ->mh_flags), and post-processing (setting of
1273 * ->last_{xid,committed}).
1275 static int mdt_req_handle(struct mdt_thread_info *info,
1276 struct mdt_handler *h, struct ptlrpc_request *req)
1283 LASSERT(h->mh_act != NULL);
1284 LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1285 LASSERT(current->journal_info == NULL);
1287 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1289 if (h->mh_fail_id != 0)
1290 MDT_FAIL_RETURN(h->mh_fail_id, 0);
1293 flags = h->mh_flags;
1294 LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1296 if (h->mh_fmt != NULL) {
1297 req_capsule_set(&info->mti_pill, h->mh_fmt);
1298 rc = mdt_unpack_req_pack_rep(info, flags);
1301 if (rc == 0 && flags & MUTABOR &&
1302 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1305 if (rc == 0 && flags & HABEO_CLAVIS) {
1306 struct ldlm_request *dlm_req;
1308 LASSERT(h->mh_fmt != NULL);
1310 dlm_req = req_capsule_client_get(&info->mti_pill,&RMF_DLM_REQ);
1311 if (dlm_req != NULL) {
1312 if (info->mti_mdt->mdt_opts.mo_compat_resname)
1313 rc = mdt_lock_resname_compat(info->mti_mdt,
1315 info->mti_dlm_req = dlm_req;
1317 CERROR("Can't unpack dlm request\n");
1326 rc = h->mh_act(info);
1328 * XXX result value is unconditionally shoved into ->rq_status
1329 * (original code sometimes placed error code into ->rq_status, and
1330 * sometimes returned it to the
1331 * caller). ptlrpc_server_handle_request() doesn't check return value
1334 req->rq_status = rc;
1336 LASSERT(current->journal_info == NULL);
1338 if (flags & HABEO_CLAVIS && info->mti_mdt->mdt_opts.mo_compat_resname) {
1339 struct ldlm_reply *dlmrep;
1341 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1343 rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1346 /* If we're DISCONNECTing, the mdt_export_data is already freed */
1349 if (h->mh_opc != MDS_DISCONNECT &&
1350 h->mh_opc != MDS_READPAGE &&
1351 h->mh_opc != LDLM_ENQUEUE) {
1352 mdt_finish_reply(info, req->rq_status);
1358 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1360 lh->mlh_lh.cookie = 0ull;
1361 lh->mlh_mode = LCK_MINMODE;
1364 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1366 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1369 static void mdt_thread_info_init(struct ptlrpc_request *req,
1370 struct mdt_thread_info *info)
1374 memset(info, 0, sizeof(*info));
1376 info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1377 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1378 info->mti_rep_buf_size[i] = -1;
1380 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1381 mdt_lock_handle_init(&info->mti_lh[i]);
1383 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1384 info->mti_ctxt = req->rq_svc_thread->t_ctx;
1385 info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1386 /* it can be NULL while CONNECT */
1388 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1389 req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1390 info->mti_rep_buf_size);
1393 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1397 req_capsule_fini(&info->mti_pill);
1398 if (info->mti_object != NULL) {
1399 mdt_object_put(info->mti_ctxt, info->mti_object);
1400 info->mti_object = NULL;
1402 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1403 mdt_lock_handle_fini(&info->mti_lh[i]);
1407 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1408 struct obd_device *obd, int *process);
1410 * Handle recovery. Return:
1411 * +1: continue request processing;
1412 * -ve: abort immediately with the given error code;
1413 * 0: send reply with error code in req->rq_status;
1415 static int mdt_recovery(struct ptlrpc_request *req)
1419 struct obd_device *obd;
1423 if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CONNECT)
1426 if (req->rq_export == NULL) {
1427 CERROR("operation %d on unconnected MDS from %s\n",
1428 lustre_msg_get_opc(req->rq_reqmsg),
1429 libcfs_id2str(req->rq_peer));
1430 req->rq_status = -ENOTCONN;
1434 /* sanity check: if the xid matches, the request must be marked as a
1435 * resent or replayed */
1436 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req) ||
1437 req->rq_xid == req_exp_last_close_xid(req),
1438 lustre_msg_get_flags(req->rq_reqmsg) &
1439 (MSG_RESENT | MSG_REPLAY)),
1440 "rq_xid "LPU64" matches last_xid, "
1441 "expected RESENT flag\n", req->rq_xid);
1443 /* else: note the opposite is not always true; a RESENT req after a
1444 * failover will usually not match the last_xid, since it was likely
1445 * never committed. A REPLAYed request will almost never match the
1446 * last xid, however it could for a committed, but still retained,
1449 obd = req->rq_export->exp_obd;
1451 /* Check for aborted recovery... */
1452 spin_lock_bh(&obd->obd_processing_task_lock);
1453 abort_recovery = obd->obd_abort_recovery;
1454 recovering = obd->obd_recovering;
1455 spin_unlock_bh(&obd->obd_processing_task_lock);
1456 if (abort_recovery) {
1457 target_abort_recovery(obd);
1458 } else if (recovering) {
1462 rc = mds_filter_recovery_request(req, obd, &should_process);
1463 if (rc != 0 || !should_process) {
1470 static int mdt_reply(struct ptlrpc_request *req, int rc,
1471 struct mdt_thread_info *info)
1473 struct obd_device *obd;
1476 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1477 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1478 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1480 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1481 if (obd && obd->obd_recovering) {
1482 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1483 RETURN(target_queue_final_reply(req, rc));
1485 /* Lost a race with recovery; let the error path
1487 rc = req->rq_status = -ENOTCONN;
1490 target_send_reply(req, rc, info->mti_fail_id);
1495 extern int mds_msg_check_version(struct lustre_msg *msg);
1497 static int mdt_handle0(struct ptlrpc_request *req,
1498 struct mdt_thread_info *info,
1499 struct mdt_opc_slice *supported)
1501 struct mdt_handler *h;
1502 struct lustre_msg *msg;
1507 MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1509 LASSERT(current->journal_info == NULL);
1511 msg = req->rq_reqmsg;
1512 rc = mds_msg_check_version(msg);
1514 rc = mdt_recovery(req);
1517 h = mdt_handler_find(lustre_msg_get_opc(msg),
1520 rc = mdt_req_handle(info, h, req);
1522 req->rq_status = -ENOTSUPP;
1523 rc = ptlrpc_error(req);
1528 rc = mdt_reply(req, rc, info);
1531 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
1536 * MDT handler function called by ptlrpc service thread when request comes.
1538 * XXX common "target" functionality should be factored into separate module
1539 * shared by mdt, ost and stand-alone services like fld.
1541 static int mdt_handle_common(struct ptlrpc_request *req,
1542 struct mdt_opc_slice *supported)
1544 struct lu_context *ctx;
1545 struct mdt_thread_info *info;
1549 ctx = req->rq_svc_thread->t_ctx;
1550 LASSERT(ctx != NULL);
1551 LASSERT(ctx->lc_thread == req->rq_svc_thread);
1552 info = lu_context_key_get(ctx, &mdt_thread_key);
1553 LASSERT(info != NULL);
1555 mdt_thread_info_init(req, info);
1557 rc = mdt_handle0(req, info, supported);
1559 mdt_thread_info_fini(info);
1563 static int mdt_regular_handle(struct ptlrpc_request *req)
1565 return mdt_handle_common(req, mdt_regular_handlers);
1568 static int mdt_readpage_handle(struct ptlrpc_request *req)
1570 return mdt_handle_common(req, mdt_readpage_handlers);
1573 static int mdt_mdsc_handle(struct ptlrpc_request *req)
1575 return mdt_handle_common(req, mdt_seq_handlers);
1578 static int mdt_mdss_handle(struct ptlrpc_request *req)
1580 return mdt_handle_common(req, mdt_seq_handlers);
1583 static int mdt_dtss_handle(struct ptlrpc_request *req)
1585 return mdt_handle_common(req, mdt_seq_handlers);
1588 static int mdt_fld_handle(struct ptlrpc_request *req)
1590 return mdt_handle_common(req, mdt_fld_handlers);
1606 static int mdt_intent_getattr(enum mdt_it_code opcode,
1607 struct mdt_thread_info *info,
1608 struct ldlm_lock **,
1610 static int mdt_intent_reint(enum mdt_it_code opcode,
1611 struct mdt_thread_info *info,
1612 struct ldlm_lock **,
1615 static struct mdt_it_flavor {
1616 const struct req_format *it_fmt;
1618 int (*it_act)(enum mdt_it_code ,
1619 struct mdt_thread_info *,
1620 struct ldlm_lock **,
1623 } mdt_it_flavor[] = {
1625 .it_fmt = &RQF_LDLM_INTENT,
1626 /*.it_flags = HABEO_REFERO,*/
1628 .it_act = mdt_intent_reint,
1629 .it_reint = REINT_OPEN
1632 .it_fmt = &RQF_LDLM_INTENT,
1633 .it_flags = MUTABOR,
1634 .it_act = mdt_intent_reint,
1635 .it_reint = REINT_OPEN
1638 .it_fmt = &RQF_LDLM_INTENT,
1639 .it_flags = MUTABOR,
1640 .it_act = mdt_intent_reint,
1641 .it_reint = REINT_CREATE
1643 [MDT_IT_GETATTR] = {
1644 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1645 .it_flags = HABEO_REFERO,
1646 .it_act = mdt_intent_getattr
1648 [MDT_IT_READDIR] = {
1654 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1655 .it_flags = HABEO_REFERO,
1656 .it_act = mdt_intent_getattr
1659 .it_fmt = &RQF_LDLM_INTENT_UNLINK,
1660 .it_flags = MUTABOR,
1661 .it_act = NULL, /* XXX can be mdt_intent_reint, ? */
1662 .it_reint = REINT_UNLINK
1666 .it_flags = MUTABOR,
1669 [MDT_IT_GETXATTR] = {
1676 static int mdt_intent_getattr(enum mdt_it_code opcode,
1677 struct mdt_thread_info *info,
1678 struct ldlm_lock **lockp,
1681 struct ldlm_lock *old_lock = *lockp;
1682 struct ldlm_lock *new_lock = NULL;
1683 struct ptlrpc_request *req = mdt_info_req(info);
1684 struct ldlm_reply *ldlm_rep;
1685 struct mdt_lock_handle tmp_lock;
1686 struct mdt_lock_handle *lhc = &tmp_lock;
1693 child_bits = MDS_INODELOCK_LOOKUP;
1695 case MDT_IT_GETATTR:
1696 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1699 CERROR("Unhandled till now");
1703 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1704 mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
1706 ldlm_rep->lock_policy_res2 =
1707 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
1708 mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
1710 if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
1711 ldlm_rep->lock_policy_res2 = 0;
1712 if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
1713 ldlm_rep->lock_policy_res2) {
1714 RETURN(ELDLM_LOCK_ABORTED);
1717 new_lock = ldlm_handle2lock(&lhc->mlh_lh);
1718 if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1721 LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
1722 opcode, lhc->mlh_lh.cookie);
1726 /* FIXME:This only happens when MDT can handle RESENT */
1727 if (new_lock->l_export == req->rq_export) {
1728 /* Already gave this to the client, which means that we
1729 * reconstructed a reply. */
1730 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1732 RETURN(ELDLM_LOCK_REPLACED);
1736 * These are copied from mds/hander.c, and should be factored into
1737 * ldlm module in order to share these code, and be easy for merge.
1740 /* Fixup the lock to be given to the client */
1741 lock_res_and_lock(new_lock);
1742 new_lock->l_readers = 0;
1743 new_lock->l_writers = 0;
1745 new_lock->l_export = class_export_get(req->rq_export);
1746 list_add(&new_lock->l_export_chain,
1747 &new_lock->l_export->exp_ldlm_data.led_held_locks);
1749 new_lock->l_blocking_ast = old_lock->l_blocking_ast;
1750 new_lock->l_completion_ast = old_lock->l_completion_ast;
1752 new_lock->l_remote_handle = old_lock->l_remote_handle;
1754 new_lock->l_flags &= ~LDLM_FL_LOCAL;
1756 unlock_res_and_lock(new_lock);
1757 LDLM_LOCK_PUT(new_lock);
1759 RETURN(ELDLM_LOCK_REPLACED);
1762 static int mdt_intent_reint(enum mdt_it_code opcode,
1763 struct mdt_thread_info *info,
1764 struct ldlm_lock **lockp,
1769 struct ldlm_reply *rep;
1771 static const struct req_format *intent_fmts[REINT_MAX] = {
1772 [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
1773 [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN
1778 opc = mdt_reint_opcode(info, intent_fmts);
1782 if (mdt_it_flavor[opcode].it_reint != opc) {
1783 CERROR("Reint code %ld doesn't match intent: %d\n",
1788 rc = mdt_reint_internal(info, opc);
1790 rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1793 rep->lock_policy_res2 = rc;
1795 mdt_set_disposition(info, rep, DISP_IT_EXECD);
1797 mdt_finish_reply(info, rc);
1799 RETURN(ELDLM_LOCK_ABORTED);
1802 static int mdt_intent_code(long itcode)
1810 case IT_OPEN|IT_CREAT:
1817 rc = MDT_IT_READDIR;
1820 rc = MDT_IT_GETATTR;
1832 rc = MDT_IT_GETXATTR;
1835 CERROR("Unknown intent opcode: %ld\n", itcode);
1842 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
1843 struct ldlm_lock **lockp, int flags)
1845 struct req_capsule *pill;
1846 struct mdt_it_flavor *flv;
1851 opc = mdt_intent_code(itopc);
1855 pill = &info->mti_pill;
1856 flv = &mdt_it_flavor[opc];
1858 if (flv->it_fmt != NULL)
1859 req_capsule_extend(pill, flv->it_fmt);
1861 rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
1863 struct ptlrpc_request *req = mdt_info_req(info);
1864 if (flv->it_flags & MUTABOR &&
1865 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1868 if (rc == 0 && flv->it_act != NULL) {
1869 /* execute policy */
1870 rc = flv->it_act(opc, info, lockp, flags);
1876 static int mdt_intent_policy(struct ldlm_namespace *ns,
1877 struct ldlm_lock **lockp, void *req_cookie,
1878 ldlm_mode_t mode, int flags, void *data)
1880 struct mdt_thread_info *info;
1881 struct ptlrpc_request *req = req_cookie;
1882 struct ldlm_intent *it;
1883 struct req_capsule *pill;
1884 struct ldlm_lock *lock = *lockp;
1889 LASSERT(req != NULL);
1891 info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1892 LASSERT(info != NULL);
1893 pill = &info->mti_pill;
1894 LASSERT(pill->rc_req == req);
1896 if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
1897 req_capsule_extend(pill, &RQF_LDLM_INTENT);
1898 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
1900 LDLM_DEBUG(lock, "intent policy opc: %s",
1901 ldlm_it2str(it->opc));
1903 rc = mdt_intent_opc(it->opc, info, lockp, flags);
1909 /* No intent was provided */
1910 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
1911 rc = req_capsule_pack(pill);
1919 static int mdt_seq_fini(const struct lu_context *ctx,
1920 struct mdt_device *m)
1922 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1925 if (ls && ls->ls_server_seq) {
1926 seq_server_fini(ls->ls_server_seq, ctx);
1927 OBD_FREE_PTR(ls->ls_server_seq);
1928 ls->ls_server_seq = NULL;
1931 if (ls && ls->ls_control_seq) {
1932 seq_server_fini(ls->ls_control_seq, ctx);
1933 OBD_FREE_PTR(ls->ls_control_seq);
1934 ls->ls_control_seq = NULL;
1937 if (ls && ls->ls_client_seq) {
1938 seq_client_fini(ls->ls_client_seq);
1939 OBD_FREE_PTR(ls->ls_client_seq);
1940 ls->ls_client_seq = NULL;
1946 static int mdt_seq_init(const struct lu_context *ctx,
1948 struct mdt_device *m)
1955 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1958 * This is sequence-controller node. Init seq-controller server on local
1961 if (ls->ls_node_id == 0) {
1962 LASSERT(ls->ls_control_seq == NULL);
1964 OBD_ALLOC_PTR(ls->ls_control_seq);
1965 if (ls->ls_control_seq == NULL)
1968 rc = seq_server_init(ls->ls_control_seq,
1969 m->mdt_bottom, uuid,
1970 LUSTRE_SEQ_CONTROLLER,
1974 GOTO(out_seq_fini, rc);
1976 OBD_ALLOC_PTR(ls->ls_client_seq);
1977 if (ls->ls_client_seq == NULL)
1978 GOTO(out_seq_fini, rc = -ENOMEM);
1980 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
1981 if (prefix == NULL) {
1982 OBD_FREE_PTR(ls->ls_client_seq);
1983 GOTO(out_seq_fini, rc = -ENOMEM);
1986 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
1990 * Init seq-controller client after seq-controller server is
1991 * ready. Pass ls->ls_control_seq to it for direct talking.
1993 rc = seq_client_init(ls->ls_client_seq, NULL,
1994 LUSTRE_SEQ_METADATA, prefix,
1995 ls->ls_control_seq, ctx);
1996 OBD_FREE(prefix, MAX_OBD_NAME + 5);
1999 GOTO(out_seq_fini, rc);
2002 /* Init seq-server on local MDT */
2003 LASSERT(ls->ls_server_seq == NULL);
2005 OBD_ALLOC_PTR(ls->ls_server_seq);
2006 if (ls->ls_server_seq == NULL)
2007 GOTO(out_seq_fini, rc = -ENOMEM);
2009 rc = seq_server_init(ls->ls_server_seq,
2010 m->mdt_bottom, uuid,
2014 GOTO(out_seq_fini, rc = -ENOMEM);
2016 /* Assign seq-controller client to local seq-server. */
2017 if (ls->ls_node_id == 0) {
2018 LASSERT(ls->ls_client_seq != NULL);
2020 rc = seq_server_set_cli(ls->ls_server_seq,
2028 mdt_seq_fini(ctx, m);
2034 * Init client sequence manager which is used by local MDS to talk to sequence
2035 * controller on remote node.
2037 static int mdt_seq_init_cli(const struct lu_context *ctx,
2038 struct mdt_device *m,
2039 struct lustre_cfg *cfg)
2041 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2042 struct obd_device *mdc;
2043 struct obd_uuid *uuidp, *mdcuuidp;
2044 char *uuid_str, *mdc_uuid_str;
2047 struct mdt_thread_info *info;
2048 char *p, *index_string = lustre_cfg_string(cfg, 2);
2051 info = lu_context_key_get(ctx, &mdt_thread_key);
2052 uuidp = &info->mti_u.uuid[0];
2053 mdcuuidp = &info->mti_u.uuid[1];
2055 LASSERT(index_string);
2057 index = simple_strtol(index_string, &p, 10);
2059 CERROR("Invalid index in lustre_cgf, offset 2\n");
2063 /* check if this is adding the first MDC and controller is not yet
2065 if (index != 0 || ls->ls_client_seq)
2068 uuid_str = lustre_cfg_string(cfg, 1);
2069 mdc_uuid_str = lustre_cfg_string(cfg, 4);
2070 obd_str2uuid(uuidp, uuid_str);
2071 obd_str2uuid(mdcuuidp, mdc_uuid_str);
2073 mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2075 CERROR("can't find controller MDC by uuid %s\n",
2078 } else if (!mdc->obd_set_up) {
2079 CERROR("target %s not set up\n", mdc->obd_name);
2082 struct lustre_handle conn = {0, };
2084 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
2085 mdc->obd_name, mdc->obd_uuid.uuid);
2087 rc = obd_connect(ctx, &conn, mdc, &mdc->obd_uuid, NULL);
2090 CERROR("target %s connect error %d\n",
2093 ls->ls_client_exp = class_conn2export(&conn);
2095 OBD_ALLOC_PTR(ls->ls_client_seq);
2097 if (ls->ls_client_seq != NULL) {
2100 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2104 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2107 rc = seq_client_init(ls->ls_client_seq,
2109 LUSTRE_SEQ_METADATA,
2110 prefix, NULL, NULL);
2111 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2118 LASSERT(ls->ls_server_seq != NULL);
2120 rc = seq_server_set_cli(ls->ls_server_seq,
2129 static void mdt_seq_fini_cli(struct mdt_device *m)
2136 ls = m->mdt_md_dev.md_lu_dev.ld_site;
2138 if (ls && ls->ls_server_seq)
2139 seq_server_set_cli(ls->ls_server_seq,
2142 if (ls && ls->ls_client_exp) {
2143 rc = obd_disconnect(ls->ls_client_exp);
2145 CERROR("failure to disconnect "
2148 ls->ls_client_exp = NULL;
2156 static int mdt_fld_fini(const struct lu_context *ctx,
2157 struct mdt_device *m)
2159 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2162 if (ls && ls->ls_server_fld) {
2163 fld_server_fini(ls->ls_server_fld, ctx);
2164 OBD_FREE_PTR(ls->ls_server_fld);
2165 ls->ls_server_fld = NULL;
2168 if (ls && ls->ls_client_fld != NULL) {
2169 fld_client_fini(ls->ls_client_fld);
2170 OBD_FREE_PTR(ls->ls_client_fld);
2171 ls->ls_client_fld = NULL;
2177 static int mdt_fld_init(const struct lu_context *ctx,
2179 struct mdt_device *m)
2181 struct lu_fld_target target;
2186 ls = m->mdt_md_dev.md_lu_dev.ld_site;
2188 OBD_ALLOC_PTR(ls->ls_server_fld);
2189 if (ls->ls_server_fld == NULL)
2190 RETURN(rc = -ENOMEM);
2192 rc = fld_server_init(ls->ls_server_fld, ctx,
2193 m->mdt_bottom, uuid);
2195 OBD_FREE_PTR(ls->ls_server_fld);
2196 ls->ls_server_fld = NULL;
2199 OBD_ALLOC_PTR(ls->ls_client_fld);
2200 if (!ls->ls_client_fld)
2201 GOTO(out_fld_fini, rc = -ENOMEM);
2203 rc = fld_client_init(ls->ls_client_fld, uuid,
2204 LUSTRE_CLI_FLD_HASH_DHT);
2206 CERROR("can't init FLD, err %d\n", rc);
2207 OBD_FREE_PTR(ls->ls_client_fld);
2208 GOTO(out_fld_fini, rc);
2211 target.ft_srv = ls->ls_server_fld;
2212 target.ft_idx = ls->ls_node_id;
2213 target.ft_exp = NULL;
2215 fld_client_add_target(ls->ls_client_fld, &target);
2219 mdt_fld_fini(ctx, m);
2223 /* device init/fini methods */
2224 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2226 if (m->mdt_regular_service != NULL) {
2227 ptlrpc_unregister_service(m->mdt_regular_service);
2228 m->mdt_regular_service = NULL;
2230 if (m->mdt_readpage_service != NULL) {
2231 ptlrpc_unregister_service(m->mdt_readpage_service);
2232 m->mdt_readpage_service = NULL;
2234 if (m->mdt_setattr_service != NULL) {
2235 ptlrpc_unregister_service(m->mdt_setattr_service);
2236 m->mdt_setattr_service = NULL;
2238 if (m->mdt_mdsc_service != NULL) {
2239 ptlrpc_unregister_service(m->mdt_mdsc_service);
2240 m->mdt_mdsc_service = NULL;
2242 if (m->mdt_mdss_service != NULL) {
2243 ptlrpc_unregister_service(m->mdt_mdss_service);
2244 m->mdt_mdss_service = NULL;
2246 if (m->mdt_dtss_service != NULL) {
2247 ptlrpc_unregister_service(m->mdt_dtss_service);
2248 m->mdt_dtss_service = NULL;
2250 if (m->mdt_fld_service != NULL) {
2251 ptlrpc_unregister_service(m->mdt_fld_service);
2252 m->mdt_fld_service = NULL;
2256 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2259 static struct ptlrpc_service_conf conf;
2262 conf = (typeof(conf)) {
2263 .psc_nbufs = MDS_NBUFS,
2264 .psc_bufsize = MDS_BUFSIZE,
2265 .psc_max_req_size = MDS_MAXREQSIZE,
2266 .psc_max_reply_size = MDS_MAXREPSIZE,
2267 .psc_req_portal = MDS_REQUEST_PORTAL,
2268 .psc_rep_portal = MDC_REPLY_PORTAL,
2269 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2271 * We'd like to have a mechanism to set this on a per-device
2272 * basis, but alas...
2274 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2276 .psc_ctx_tags = LCT_MD_THREAD
2279 m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2280 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2281 "mdt_ldlm_client", m->mdt_ldlm_client);
2283 m->mdt_regular_service =
2284 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
2285 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2287 if (m->mdt_regular_service == NULL)
2290 rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
2292 GOTO(err_mdt_svc, rc);
2295 * readpage service configuration. Parameters have to be adjusted,
2298 conf = (typeof(conf)) {
2299 .psc_nbufs = MDS_NBUFS,
2300 .psc_bufsize = MDS_BUFSIZE,
2301 .psc_max_req_size = MDS_MAXREQSIZE,
2302 .psc_max_reply_size = MDS_MAXREPSIZE,
2303 .psc_req_portal = MDS_READPAGE_PORTAL,
2304 .psc_rep_portal = MDC_REPLY_PORTAL,
2305 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2306 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2308 .psc_ctx_tags = LCT_MD_THREAD
2310 m->mdt_readpage_service =
2311 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2312 LUSTRE_MDT_NAME "_readpage",
2313 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2316 if (m->mdt_readpage_service == NULL) {
2317 CERROR("failed to start readpage service\n");
2318 GOTO(err_mdt_svc, rc = -ENOMEM);
2321 rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2324 * setattr service configuration.
2326 conf = (typeof(conf)) {
2327 .psc_nbufs = MDS_NBUFS,
2328 .psc_bufsize = MDS_BUFSIZE,
2329 .psc_max_req_size = MDS_MAXREQSIZE,
2330 .psc_max_reply_size = MDS_MAXREPSIZE,
2331 .psc_req_portal = MDS_SETATTR_PORTAL,
2332 .psc_rep_portal = MDC_REPLY_PORTAL,
2333 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2334 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2336 .psc_ctx_tags = LCT_MD_THREAD
2339 m->mdt_setattr_service =
2340 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2341 LUSTRE_MDT_NAME "_setattr",
2342 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2345 if (!m->mdt_setattr_service) {
2346 CERROR("failed to start setattr service\n");
2347 GOTO(err_mdt_svc, rc = -ENOMEM);
2350 rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2352 GOTO(err_mdt_svc, rc);
2355 * sequence controller service configuration
2357 conf = (typeof(conf)) {
2358 .psc_nbufs = MDS_NBUFS,
2359 .psc_bufsize = MDS_BUFSIZE,
2360 .psc_max_req_size = SEQ_MAXREQSIZE,
2361 .psc_max_reply_size = SEQ_MAXREPSIZE,
2362 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
2363 .psc_rep_portal = MDC_REPLY_PORTAL,
2364 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2365 .psc_num_threads = SEQ_NUM_THREADS,
2366 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2369 m->mdt_mdsc_service =
2370 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
2371 LUSTRE_MDT_NAME"_mdsc",
2372 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2374 if (!m->mdt_mdsc_service) {
2375 CERROR("failed to start seq controller service\n");
2376 GOTO(err_mdt_svc, rc = -ENOMEM);
2379 rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
2381 GOTO(err_mdt_svc, rc);
2384 * metadata sequence server service configuration
2386 conf = (typeof(conf)) {
2387 .psc_nbufs = MDS_NBUFS,
2388 .psc_bufsize = MDS_BUFSIZE,
2389 .psc_max_req_size = SEQ_MAXREQSIZE,
2390 .psc_max_reply_size = SEQ_MAXREPSIZE,
2391 .psc_req_portal = SEQ_METADATA_PORTAL,
2392 .psc_rep_portal = MDC_REPLY_PORTAL,
2393 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2394 .psc_num_threads = SEQ_NUM_THREADS,
2395 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2398 m->mdt_mdss_service =
2399 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
2400 LUSTRE_MDT_NAME"_mdss",
2401 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2403 if (!m->mdt_mdss_service) {
2404 CERROR("failed to start metadata seq server service\n");
2405 GOTO(err_mdt_svc, rc = -ENOMEM);
2408 rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
2410 GOTO(err_mdt_svc, rc);
2414 * Data sequence server service configuration. We want to have really
2415 * cluster-wide sequences space. This is why we start only one sequence
2416 * controller which manages space.
2418 conf = (typeof(conf)) {
2419 .psc_nbufs = MDS_NBUFS,
2420 .psc_bufsize = MDS_BUFSIZE,
2421 .psc_max_req_size = SEQ_MAXREQSIZE,
2422 .psc_max_reply_size = SEQ_MAXREPSIZE,
2423 .psc_req_portal = SEQ_DATA_PORTAL,
2424 .psc_rep_portal = OSC_REPLY_PORTAL,
2425 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2426 .psc_num_threads = SEQ_NUM_THREADS,
2427 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2430 m->mdt_dtss_service =
2431 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
2432 LUSTRE_MDT_NAME"_dtss",
2433 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2435 if (!m->mdt_dtss_service) {
2436 CERROR("failed to start data seq server service\n");
2437 GOTO(err_mdt_svc, rc = -ENOMEM);
2440 rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
2442 GOTO(err_mdt_svc, rc);
2444 /* FLD service start */
2445 conf = (typeof(conf)) {
2446 .psc_nbufs = MDS_NBUFS,
2447 .psc_bufsize = MDS_BUFSIZE,
2448 .psc_max_req_size = FLD_MAXREQSIZE,
2449 .psc_max_reply_size = FLD_MAXREPSIZE,
2450 .psc_req_portal = FLD_REQUEST_PORTAL,
2451 .psc_rep_portal = MDC_REPLY_PORTAL,
2452 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2453 .psc_num_threads = FLD_NUM_THREADS,
2454 .psc_ctx_tags = LCT_DT_THREAD|LCT_MD_THREAD
2457 m->mdt_fld_service =
2458 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
2459 LUSTRE_MDT_NAME"_fld",
2460 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2462 if (!m->mdt_fld_service) {
2463 CERROR("failed to start fld service\n");
2464 GOTO(err_mdt_svc, rc = -ENOMEM);
2467 rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
2469 GOTO(err_mdt_svc, rc);
2474 mdt_stop_ptlrpc_service(m);
2479 static void mdt_stack_fini(const struct lu_context *ctx,
2480 struct mdt_device *m, struct lu_device *top)
2482 struct lu_device *d = top, *n;
2483 struct lustre_cfg_bufs *bufs;
2484 struct lustre_cfg *lcfg;
2485 struct mdt_thread_info *info;
2488 info = lu_context_key_get(ctx, &mdt_thread_key);
2489 LASSERT(info != NULL);
2491 bufs = &info->mti_u.bufs;
2492 /* process cleanup */
2493 lustre_cfg_bufs_reset(bufs, NULL);
2494 lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
2496 CERROR("Cannot alloc lcfg!\n");
2500 top->ld_ops->ldo_process_config(ctx, top, lcfg);
2501 lustre_cfg_free(lcfg);
2503 lu_site_purge(ctx, top->ld_site, ~0);
2505 struct obd_type *type;
2506 struct lu_device_type *ldt = d->ld_type;
2508 /* each fini() returns next device in stack of layers
2509 * * so we can avoid the recursion */
2510 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
2512 ldt->ldt_ops->ldto_device_free(ctx, d);
2513 type = ldt->ldt_obd_type;
2515 class_put_type(type);
2517 /* switch to the next device in the layer */
2520 m->mdt_child = NULL;
2523 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
2524 const char *typename,
2525 struct lu_device *child,
2526 struct lustre_cfg *cfg)
2528 struct obd_type *type;
2529 struct lu_device_type *ldt;
2530 struct lu_device *d;
2535 type = class_get_type(typename);
2537 CERROR("Unknown type: '%s'\n", typename);
2538 GOTO(out, rc = -ENODEV);
2541 rc = lu_context_refill(ctx);
2543 CERROR("Failure to refill context: '%d'\n", rc);
2549 CERROR("type: '%s'\n", typename);
2550 GOTO(out_type, rc = -EINVAL);
2553 ldt->ldt_obd_type = type;
2554 d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
2556 CERROR("Cannot allocate device: '%s'\n", typename);
2557 GOTO(out_type, rc = -ENODEV);
2560 LASSERT(child->ld_site);
2561 d->ld_site = child->ld_site;
2564 rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
2566 CERROR("can't init device '%s', rc %d\n", typename, rc);
2567 GOTO(out_alloc, rc);
2574 ldt->ldt_ops->ldto_device_free(ctx, d);
2577 class_put_type(type);
2582 static int mdt_stack_init(const struct lu_context *ctx,
2583 struct mdt_device *m, struct lustre_cfg *cfg)
2585 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2586 struct lu_device *tmp;
2587 struct md_device *md;
2591 /* init the stack */
2592 tmp = mdt_layer_setup(ctx, LUSTRE_OSD_NAME, d, cfg);
2594 RETURN(PTR_ERR(tmp));
2596 m->mdt_bottom = lu2dt_dev(tmp);
2598 tmp = mdt_layer_setup(ctx, LUSTRE_MDD_NAME, d, cfg);
2600 GOTO(out, rc = PTR_ERR(tmp));
2605 tmp = mdt_layer_setup(ctx, LUSTRE_CMM_NAME, d, cfg);
2607 GOTO(out, rc = PTR_ERR(tmp));
2610 /*set mdd upcall device*/
2611 md->md_upcall.mu_upcall_dev = lu2md_dev(d);
2614 /*set cmm upcall device*/
2615 md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
2617 m->mdt_child = lu2md_dev(d);
2619 /* process setup config */
2620 tmp = &m->mdt_md_dev.md_lu_dev;
2621 rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2624 /* fini from last known good lu_device */
2626 mdt_stack_fini(ctx, m, d);
2631 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2633 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2634 struct lu_site *ls = d->ld_site;
2637 target_cleanup_recovery(m->mdt_md_dev.md_lu_dev.ld_obd);
2638 mdt_fs_cleanup(ctx, m);
2639 ping_evictor_stop();
2640 mdt_stop_ptlrpc_service(m);
2642 mdt_seq_fini(ctx, m);
2643 mdt_seq_fini_cli(m);
2645 mdt_fld_fini(ctx, m);
2647 /* finish the stack */
2648 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2650 if (m->mdt_namespace != NULL) {
2651 ldlm_namespace_free(m->mdt_namespace, 0);
2652 m->mdt_namespace = NULL;
2660 LASSERT(atomic_read(&d->ld_ref) == 0);
2661 md_device_fini(&m->mdt_md_dev);
2666 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2667 struct lu_device_type *ldt, struct lustre_cfg *cfg)
2669 struct mdt_thread_info *info;
2670 struct obd_device *obd;
2671 const char *dev = lustre_cfg_string(cfg, 0);
2672 const char *num = lustre_cfg_string(cfg, 2);
2677 info = lu_context_key_get(ctx, &mdt_thread_key);
2678 LASSERT(info != NULL);
2680 obd = class_name2obd(dev);
2683 spin_lock_init(&m->mdt_transno_lock);
2685 m->mdt_max_mdsize = MAX_MD_SIZE;
2686 m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2688 spin_lock_init(&m->mdt_epoch_lock);
2689 /* Temporary. should parse mount option. */
2690 m->mdt_opts.mo_user_xattr = 0;
2691 m->mdt_opts.mo_acl = 0;
2692 m->mdt_opts.mo_compat_resname = 0;
2693 obd->obd_replayable = 1;
2700 md_device_init(&m->mdt_md_dev, ldt);
2701 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2702 m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2703 /* set this lu_device to obd, because error handling need it */
2704 obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
2706 rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2708 CERROR("can't init lu_site, rc %d\n", rc);
2709 GOTO(err_free_site, rc);
2712 /* init the stack */
2713 rc = mdt_stack_init(ctx, m, cfg);
2715 CERROR("can't init device stack, rc %d\n", rc);
2716 GOTO(err_fini_site, rc);
2719 /* set server index */
2721 s->ls_node_id = simple_strtol(num, NULL, 10);
2723 rc = mdt_fld_init(ctx, obd->obd_name, m);
2725 GOTO(err_fini_stack, rc);
2727 rc = mdt_seq_init(ctx, obd->obd_name, m);
2729 GOTO(err_fini_fld, rc);
2731 snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
2732 LUSTRE_MDT_NAME"-%p", m);
2733 m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
2734 LDLM_NAMESPACE_SERVER);
2735 if (m->mdt_namespace == NULL)
2736 GOTO(err_fini_seq, rc = -ENOMEM);
2738 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
2740 rc = mdt_start_ptlrpc_service(m);
2742 GOTO(err_free_ns, rc);
2744 ping_evictor_start();
2745 rc = mdt_fs_setup(ctx, m);
2747 GOTO(err_stop_service, rc);
2751 mdt_stop_ptlrpc_service(m);
2753 ldlm_namespace_free(m->mdt_namespace, 0);
2754 m->mdt_namespace = NULL;
2756 mdt_seq_fini(ctx, m);
2758 mdt_fld_fini(ctx, m);
2760 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2766 md_device_fini(&m->mdt_md_dev);
2770 /* used by MGS to process specific configurations */
2771 static int mdt_process_config(const struct lu_context *ctx,
2772 struct lu_device *d, struct lustre_cfg *cfg)
2774 struct mdt_device *m = mdt_dev(d);
2775 struct md_device *md_next = m->mdt_child;
2776 struct lu_device *next = md2lu_dev(md_next);
2780 switch (cfg->lcfg_command) {
2783 * Add mdc hook to get first MDT uuid and connect it to
2784 * ls->controller to use for seq manager.
2786 err = mdt_seq_init_cli(ctx, mdt_dev(d), cfg);
2788 CERROR("can't initialize controller export, "
2792 /* others are passed further */
2793 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
2799 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
2800 const struct lu_object_header *hdr,
2801 struct lu_device *d)
2803 struct mdt_object *mo;
2809 struct lu_object *o;
2810 struct lu_object_header *h;
2812 o = &mo->mot_obj.mo_lu;
2813 h = &mo->mot_header;
2814 lu_object_header_init(h);
2815 lu_object_init(o, h, d);
2816 lu_object_add_top(h, o);
2817 o->lo_ops = &mdt_obj_ops;
2823 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
2825 struct mdt_device *d = mdt_dev(o->lo_dev);
2826 struct lu_device *under;
2827 struct lu_object *below;
2831 CDEBUG(D_INFO, "object init, fid = "DFID"\n",
2832 PFID(lu_object_fid(o)));
2834 under = &d->mdt_child->md_lu_dev;
2835 below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
2836 if (below != NULL) {
2837 lu_object_add(o, below);
2843 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
2845 struct mdt_object *mo = mdt_obj(o);
2846 struct lu_object_header *h;
2850 CDEBUG(D_INFO, "object free, fid = "DFID"\n",
2851 PFID(lu_object_fid(o)));
2854 lu_object_header_fini(h);
2859 static int mdt_object_print(const struct lu_context *ctxt, void *cookie,
2860 lu_printer_t p, const struct lu_object *o)
2862 return (*p)(ctxt, cookie, LUSTRE_MDT_NAME"-object@%p", o);
2865 static struct lu_device_operations mdt_lu_ops = {
2866 .ldo_object_alloc = mdt_object_alloc,
2867 .ldo_process_config = mdt_process_config
2870 static struct lu_object_operations mdt_obj_ops = {
2871 .loo_object_init = mdt_object_init,
2872 .loo_object_free = mdt_object_free,
2873 .loo_object_print = mdt_object_print
2876 /* mds_connect_internal */
2877 static int mdt_connect_internal(struct obd_export *exp,
2878 struct mdt_device *mdt,
2879 struct obd_connect_data *data)
2882 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
2883 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
2885 /* If no known bits (which should not happen, probably,
2886 as everybody should support LOOKUP and UPDATE bits at least)
2887 revert to compat mode with plain locks. */
2888 if (!data->ocd_ibits_known &&
2889 data->ocd_connect_flags & OBD_CONNECT_IBITS)
2890 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
2892 if (!mdt->mdt_opts.mo_acl)
2893 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
2895 if (!mdt->mdt_opts.mo_user_xattr)
2896 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
2898 exp->exp_connect_flags = data->ocd_connect_flags;
2899 data->ocd_version = LUSTRE_VERSION_CODE;
2900 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
2903 if (mdt->mdt_opts.mo_acl &&
2904 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
2905 CWARN("%s: MDS requires ACL support but client does not\n",
2906 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
2912 /* mds_connect copy */
2913 static int mdt_obd_connect(const struct lu_context *ctx,
2914 struct lustre_handle *conn, struct obd_device *obd,
2915 struct obd_uuid *cluuid,
2916 struct obd_connect_data *data)
2918 struct mdt_export_data *med;
2919 struct mdt_client_data *mcd;
2920 struct obd_export *exp;
2921 struct mdt_device *mdt;
2925 LASSERT(ctx != NULL);
2926 if (!conn || !obd || !cluuid)
2929 mdt = mdt_dev(obd->obd_lu_dev);
2931 rc = class_connect(conn, obd, cluuid);
2935 exp = class_conn2export(conn);
2936 LASSERT(exp != NULL);
2937 med = &exp->exp_mdt_data;
2939 rc = mdt_connect_internal(exp, mdt, data);
2943 memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
2945 rc = mdt_client_add(ctx, mdt, med, -1);
2953 class_disconnect(exp);
2955 class_export_put(exp);
2960 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
2961 struct obd_uuid *cluuid,
2962 struct obd_connect_data *data)
2967 if (exp == NULL || obd == NULL || cluuid == NULL)
2970 rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
2975 static int mdt_obd_disconnect(struct obd_export *exp)
2981 class_export_get(exp);
2983 /* Disconnect early so that clients can't keep using export */
2984 rc = class_disconnect(exp);
2985 //ldlm_cancel_locks_for_export(exp);
2987 /* complete all outstanding replies */
2988 spin_lock(&exp->exp_lock);
2989 while (!list_empty(&exp->exp_outstanding_replies)) {
2990 struct ptlrpc_reply_state *rs =
2991 list_entry(exp->exp_outstanding_replies.next,
2992 struct ptlrpc_reply_state, rs_exp_list);
2993 struct ptlrpc_service *svc = rs->rs_service;
2995 spin_lock(&svc->srv_lock);
2996 list_del_init(&rs->rs_exp_list);
2997 ptlrpc_schedule_difficult_reply(rs);
2998 spin_unlock(&svc->srv_lock);
3000 spin_unlock(&exp->exp_lock);
3002 class_export_put(exp);
3006 /* FIXME: Can we avoid using these two interfaces? */
3007 static int mdt_init_export(struct obd_export *exp)
3009 struct mdt_export_data *med = &exp->exp_mdt_data;
3012 INIT_LIST_HEAD(&med->med_open_head);
3013 spin_lock_init(&med->med_open_lock);
3014 exp->exp_connecting = 1;
3018 static int mdt_destroy_export(struct obd_export *export)
3020 struct mdt_export_data *med;
3021 struct obd_device *obd = export->exp_obd;
3022 struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3023 struct mdt_thread_info *info;
3024 struct lu_context ctxt;
3028 med = &export->exp_mdt_data;
3030 target_destroy_export(export);
3032 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
3035 LASSERT(mdt != NULL);
3036 rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3040 lu_context_enter(&ctxt);
3042 info = lu_context_key_get(&ctxt, &mdt_thread_key);
3043 LASSERT(info != NULL);
3044 memset(info, 0, sizeof *info);
3045 /* Close any open files (which may also cause orphan unlinking). */
3046 spin_lock(&med->med_open_lock);
3047 while (!list_empty(&med->med_open_head)) {
3048 struct list_head *tmp = med->med_open_head.next;
3049 struct mdt_file_data *mfd =
3050 list_entry(tmp, struct mdt_file_data, mfd_list);
3051 struct mdt_object *o = mfd->mfd_object;
3053 /* Remove mfd handle so it can't be found again.
3054 * We are consuming the mfd_list reference here. */
3055 class_handle_unhash(&mfd->mfd_handle);
3056 list_del_init(&mfd->mfd_list);
3057 spin_unlock(&med->med_open_lock);
3058 mdt_mfd_close(&ctxt, mdt, mfd, &info->mti_attr);
3059 /* TODO: if we close the unlinked file,
3060 * we need to remove it's objects from OST */
3061 mdt_object_put(&ctxt, o);
3062 spin_lock(&med->med_open_lock);
3064 spin_unlock(&med->med_open_lock);
3065 mdt_client_free(&ctxt, mdt, med);
3067 lu_context_exit(&ctxt);
3068 lu_context_fini(&ctxt);
3073 static int mdt_upcall(const struct lu_context *ctx, struct md_device *md,
3074 enum md_upcall_event ev)
3076 struct mdt_device *m = mdt_dev(&md->md_lu_dev);
3077 struct md_device *next = m->mdt_child;
3083 rc = next->md_ops->mdo_get_maxsize(ctx, next,
3085 &m->mdt_max_cookiesize);
3086 CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
3087 m->mdt_max_mdsize, m->mdt_max_cookiesize);
3090 CERROR("invalid event\n");
3097 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3098 void *karg, void *uarg)
3100 struct lu_context ctxt;
3101 struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
3102 struct dt_device *dt = mdt->mdt_bottom;
3106 CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
3107 rc = lu_context_init(&ctxt, LCT_MD_THREAD);
3110 lu_context_enter(&ctxt);
3111 if (cmd == OBD_IOC_SYNC || cmd == OBD_IOC_SET_READONLY) {
3112 rc = dt->dd_ops->dt_sync(&ctxt, dt);
3113 if (cmd == OBD_IOC_SET_READONLY)
3114 dt->dd_ops->dt_ro(&ctxt, dt);
3117 lu_context_exit(&ctxt);
3118 lu_context_fini(&ctxt);
3122 static struct obd_ops mdt_obd_device_ops = {
3123 .o_owner = THIS_MODULE,
3124 .o_connect = mdt_obd_connect,
3125 .o_reconnect = mdt_obd_reconnect,
3126 .o_disconnect = mdt_obd_disconnect,
3127 .o_init_export = mdt_init_export,
3128 .o_destroy_export = mdt_destroy_export,
3129 .o_iocontrol = mdt_iocontrol
3132 static struct lu_device* mdt_device_fini(const struct lu_context *ctx,
3133 struct lu_device *d)
3135 struct mdt_device *m = mdt_dev(d);
3141 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
3143 struct mdt_device *m = mdt_dev(d);
3148 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
3149 struct lu_device_type *t,
3150 struct lustre_cfg *cfg)
3152 struct lu_device *l;
3153 struct mdt_device *m;
3159 l = &m->mdt_md_dev.md_lu_dev;
3160 rc = mdt_init0(ctx, m, t, cfg);
3166 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
3168 l = ERR_PTR(-ENOMEM);
3173 * context key constructor/destructor
3175 static void *mdt_thread_init(const struct lu_context *ctx,
3176 struct lu_context_key *key)
3178 struct mdt_thread_info *info;
3181 * check that no high order allocations are incurred.
3183 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
3184 OBD_ALLOC_PTR(info);
3186 info = ERR_PTR(-ENOMEM);
3190 static void mdt_thread_fini(const struct lu_context *ctx,
3191 struct lu_context_key *key, void *data)
3193 struct mdt_thread_info *info = data;
3197 struct lu_context_key mdt_thread_key = {
3198 .lct_tags = LCT_MD_THREAD,
3199 .lct_init = mdt_thread_init,
3200 .lct_fini = mdt_thread_fini
3203 static void *mdt_txn_init(const struct lu_context *ctx,
3204 struct lu_context_key *key)
3206 struct mdt_txn_info *txi;
3209 * check that no high order allocations are incurred.
3211 CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
3214 txi = ERR_PTR(-ENOMEM);
3218 static void mdt_txn_fini(const struct lu_context *ctx,
3219 struct lu_context_key *key, void *data)
3221 struct mdt_txn_info *txi = data;
3225 struct lu_context_key mdt_txn_key = {
3226 .lct_tags = LCT_TX_HANDLE,
3227 .lct_init = mdt_txn_init,
3228 .lct_fini = mdt_txn_fini
3232 static int mdt_type_init(struct lu_device_type *t)
3236 rc = lu_context_key_register(&mdt_thread_key);
3238 rc = lu_context_key_register(&mdt_txn_key);
3242 static void mdt_type_fini(struct lu_device_type *t)
3244 lu_context_key_degister(&mdt_thread_key);
3245 lu_context_key_degister(&mdt_txn_key);
3248 static struct lu_device_type_operations mdt_device_type_ops = {
3249 .ldto_init = mdt_type_init,
3250 .ldto_fini = mdt_type_fini,
3252 .ldto_device_alloc = mdt_device_alloc,
3253 .ldto_device_free = mdt_device_free,
3254 .ldto_device_fini = mdt_device_fini
3257 static struct lu_device_type mdt_device_type = {
3258 .ldt_tags = LU_DEVICE_MD,
3259 .ldt_name = LUSTRE_MDT_NAME,
3260 .ldt_ops = &mdt_device_type_ops,
3261 .ldt_ctx_tags = LCT_MD_THREAD
3264 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
3268 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
3272 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
3274 static int __init mdt_mod_init(void)
3277 struct lprocfs_static_vars lvars;
3279 printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
3281 mdt_num_threads = MDT_NUM_THREADS;
3282 lprocfs_init_vars(mdt, &lvars);
3283 rc = class_register_type(&mdt_obd_device_ops, NULL,
3284 lvars.module_vars, LUSTRE_MDT_NAME,
3289 static void __exit mdt_mod_exit(void)
3291 class_unregister_type(LUSTRE_MDT_NAME);
3295 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt) \
3296 [prefix ## _ ## opc - prefix ## _ ## base] = { \
3298 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
3299 .mh_opc = prefix ## _ ## opc, \
3300 .mh_flags = flags, \
3305 #define DEF_MDT_HNDL(flags, name, fn, fmt) \
3306 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
3308 #define DEF_SEQ_HNDL(flags, name, fn, fmt) \
3309 DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
3311 #define DEF_FLD_HNDL(flags, name, fn, fmt) \
3312 DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
3314 * Request with a format known in advance
3316 #define DEF_MDT_HNDL_F(flags, name, fn) \
3317 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
3319 #define DEF_SEQ_HNDL_F(flags, name, fn) \
3320 DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
3322 #define DEF_FLD_HNDL_F(flags, name, fn) \
3323 DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
3325 * Request with a format we do not yet know
3327 #define DEF_MDT_HNDL_0(flags, name, fn) \
3328 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
3330 static struct mdt_handler mdt_mds_ops[] = {
3331 DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
3332 DEF_MDT_HNDL_F(0, DISCONNECT, mdt_disconnect),
3333 DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus),
3334 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR, mdt_getattr),
3335 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
3336 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO|MUTABOR,
3337 SETXATTR, mdt_setxattr),
3338 DEF_MDT_HNDL_F(HABEO_CORPUS, GETXATTR, mdt_getxattr),
3339 DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs),
3340 DEF_MDT_HNDL_F(0 |MUTABOR,
3342 DEF_MDT_HNDL_F(HABEO_CORPUS , CLOSE, mdt_close),
3343 DEF_MDT_HNDL_0(0, DONE_WRITING, mdt_done_writing),
3344 DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin),
3345 DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
3346 DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle),
3347 DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle)
3350 #define DEF_OBD_HNDL(flags, name, fn) \
3351 DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
3354 static struct mdt_handler mdt_obd_ops[] = {
3355 DEF_OBD_HNDL(0, PING, mdt_obd_ping),
3356 DEF_OBD_HNDL(0, LOG_CANCEL, mdt_obd_log_cancel),
3357 DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback)
3360 #define DEF_DLM_HNDL_0(flags, name, fn) \
3361 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
3362 #define DEF_DLM_HNDL_F(flags, name, fn) \
3363 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
3365 static struct mdt_handler mdt_dlm_ops[] = {
3366 DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
3367 DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT, mdt_convert),
3368 DEF_DLM_HNDL_0(0, BL_CALLBACK, mdt_bl_callback),
3369 DEF_DLM_HNDL_0(0, CP_CALLBACK, mdt_cp_callback)
3372 static struct mdt_handler mdt_llog_ops[] = {
3375 static struct mdt_opc_slice mdt_regular_handlers[] = {
3377 .mos_opc_start = MDS_GETATTR,
3378 .mos_opc_end = MDS_LAST_OPC,
3379 .mos_hs = mdt_mds_ops
3382 .mos_opc_start = OBD_PING,
3383 .mos_opc_end = OBD_LAST_OPC,
3384 .mos_hs = mdt_obd_ops
3387 .mos_opc_start = LDLM_ENQUEUE,
3388 .mos_opc_end = LDLM_LAST_OPC,
3389 .mos_hs = mdt_dlm_ops
3392 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
3393 .mos_opc_end = LLOG_LAST_OPC,
3394 .mos_hs = mdt_llog_ops
3401 static struct mdt_handler mdt_readpage_ops[] = {
3402 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
3403 #ifdef HAVE_SPLIT_SUPPORT
3404 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
3408 * XXX: this is ugly and should be fixed one day, see mdc_close() for
3409 * detailed comments. --umka
3411 DEF_MDT_HNDL_F(HABEO_CORPUS, CLOSE, mdt_close),
3414 static struct mdt_opc_slice mdt_readpage_handlers[] = {
3416 .mos_opc_start = MDS_GETATTR,
3417 .mos_opc_end = MDS_LAST_OPC,
3418 .mos_hs = mdt_readpage_ops
3425 static struct mdt_handler mdt_seq_ops[] = {
3426 DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
3429 static struct mdt_opc_slice mdt_seq_handlers[] = {
3431 .mos_opc_start = SEQ_QUERY,
3432 .mos_opc_end = SEQ_LAST_OPC,
3433 .mos_hs = mdt_seq_ops
3440 static struct mdt_handler mdt_fld_ops[] = {
3441 DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
3444 static struct mdt_opc_slice mdt_fld_handlers[] = {
3446 .mos_opc_start = FLD_QUERY,
3447 .mos_opc_end = FLD_LAST_OPC,
3448 .mos_hs = mdt_fld_ops
3455 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3456 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
3457 MODULE_LICENSE("GPL");
3459 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
3460 "number of mdt service threads to start");
3462 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);