1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/mdt/mdt_handler.c
5 * Lustre Metadata Target (mdt) request handler
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Peter Braam <braam@clusterfs.com>
9 * Author: Andreas Dilger <adilger@clusterfs.com>
10 * Author: Phil Schwan <phil@clusterfs.com>
11 * Author: Mike Shaver <shaver@clusterfs.com>
12 * Author: Nikita Danilov <nikita@clusterfs.com>
13 * Author: Huang Hua <huanghua@clusterfs.com>
15 * This file is part of the Lustre file system, http://www.lustre.org
16 * Lustre is a trademark of Cluster File Systems, Inc.
18 * You may have signed or agreed to another license before downloading
19 * this software. If so, you are bound by the terms and conditions
20 * of that agreement, and the following does not apply to you. See the
21 * LICENSE file included with this distribution for more information.
23 * If you did not agree to a different license, then this copy of Lustre
24 * is open source software; you can redistribute it and/or modify it
25 * under the terms of version 2 of the GNU General Public License as
26 * published by the Free Software Foundation.
28 * In either case, Lustre is distributed in the hope that it will be
29 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
31 * license text for more details.
35 # define EXPORT_SYMTAB
37 #define DEBUG_SUBSYSTEM S_MDS
39 #include <linux/module.h>
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
44 * struct OBD_{ALLOC,FREE}*()
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include "mdt_internal.h"
58 #include <linux/lustre_acl.h>
60 * Initialized in mdt_mod_init().
62 unsigned long mdt_num_threads;
64 /* ptlrpc request handler for MDT. All handlers are
65 * grouped into several slices - struct mdt_opc_slice,
66 * and stored in an array - mdt_handlers[].
69 /* The name of this handler. */
71 /* Fail id for this handler, checked at the beginning of this handler*/
73 /* Operation code for this handler */
75 /* flags are listed in enum mdt_handler_flags below. */
77 /* The actual handler function to execute. */
78 int (*mh_act)(struct mdt_thread_info *info);
79 /* Request format for this request. */
80 const struct req_format *mh_fmt;
83 enum mdt_handler_flags {
85 * struct mdt_body is passed in the incoming message, and object
86 * identified by this fid exists on disk.
88 * "habeo corpus" == "I have a body"
90 HABEO_CORPUS = (1 << 0),
92 * struct ldlm_request is passed in the incoming message.
94 * "habeo clavis" == "I have a key"
96 HABEO_CLAVIS = (1 << 1),
98 * this request has fixed reply format, so that reply message can be
99 * packed by generic code.
101 * "habeo refero" == "I have a reply"
103 HABEO_REFERO = (1 << 2),
105 * this request will modify something, so check whether the filesystem
106 * is readonly or not, then return -EROFS to client asap if necessary.
108 * "mutabor" == "I shall modify"
113 struct mdt_opc_slice {
116 struct mdt_handler *mos_hs;
119 static struct mdt_opc_slice mdt_regular_handlers[];
120 static struct mdt_opc_slice mdt_readpage_handlers[];
122 static struct mdt_device *mdt_dev(struct lu_device *d);
123 static int mdt_regular_handle(struct ptlrpc_request *req);
124 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
126 static struct lu_object_operations mdt_obj_ops;
128 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
132 return (rep->lock_policy_res1 & flag);
135 void mdt_set_disposition(struct mdt_thread_info *info,
136 struct ldlm_reply *rep, int flag)
139 info->mti_opdata |= flag;
141 rep->lock_policy_res1 |= flag;
145 static int mdt_getstatus(struct mdt_thread_info *info)
147 struct md_device *next = info->mti_mdt->mdt_child;
149 struct mdt_body *body;
153 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
156 body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
157 result = next->md_ops->mdo_root_get(info->mti_ctxt,
160 body->valid |= OBD_MD_FLID;
166 static int mdt_statfs(struct mdt_thread_info *info)
168 struct md_device *next = info->mti_mdt->mdt_child;
169 struct obd_statfs *osfs;
174 /* This will trigger a watchdog timeout */
175 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
176 (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
179 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
182 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
183 /* XXX max_age optimisation is needed here. See mds_statfs */
184 result = next->md_ops->mdo_statfs(info->mti_ctxt,
185 next, &info->mti_u.ksfs);
186 statfs_pack(osfs, &info->mti_u.ksfs);
192 void mdt_pack_attr2body(struct mdt_body *b, const struct lu_attr *attr,
193 const struct lu_fid *fid)
195 /*XXX should pack the reply body according to lu_valid*/
196 b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID |
197 OBD_MD_FLGID | OBD_MD_FLTYPE |
198 OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
199 OBD_MD_FLATIME | OBD_MD_FLMTIME ;
201 if (!S_ISREG(attr->la_mode))
202 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
204 b->atime = attr->la_atime;
205 b->mtime = attr->la_mtime;
206 b->ctime = attr->la_ctime;
207 b->mode = attr->la_mode;
208 b->size = attr->la_size;
209 b->blocks = attr->la_blocks;
210 b->uid = attr->la_uid;
211 b->gid = attr->la_gid;
212 b->flags = attr->la_flags;
213 b->nlink = attr->la_nlink;
214 b->rdev = attr->la_rdev;
218 b->valid |= OBD_MD_FLID;
219 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
220 PFID(fid), b->nlink, b->mode, b->size);
224 static inline int mdt_body_has_lov(const struct lu_attr *la,
225 const struct mdt_body *body)
227 return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
228 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
231 static int mdt_getattr_internal(struct mdt_thread_info *info,
232 struct mdt_object *o)
234 struct md_object *next = mdt_object_child(o);
235 const struct mdt_body *reqbody = info->mti_body;
236 struct ptlrpc_request *req = mdt_info_req(info);
237 struct md_attr *ma = &info->mti_attr;
238 struct lu_attr *la = &ma->ma_attr;
239 struct req_capsule *pill = &info->mti_pill;
240 const struct lu_context *ctxt = info->mti_ctxt;
241 struct mdt_body *repbody;
247 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
250 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
251 repbody->eadatasize = 0;
252 repbody->aclsize = 0;
254 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
255 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER);
257 ma->ma_need = MA_INODE | MA_LOV;
258 rc = mo_attr_get(ctxt, next, ma);
259 if (rc == -EREMOTE) {
260 /* This object is located on remote node.*/
261 repbody->fid1 = *mdt_object_fid(o);
262 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
265 CERROR("getattr error for "DFID": %d\n",
266 PFID(mdt_object_fid(o)), rc);
270 if (ma->ma_valid & MA_INODE)
271 mdt_pack_attr2body(repbody, la, mdt_object_fid(o));
275 if (mdt_body_has_lov(la, reqbody)) {
276 if (ma->ma_valid & MA_LOV) {
277 LASSERT(ma->ma_lmm_size);
278 mdt_dump_lmm(D_INFO, ma->ma_lmm);
279 repbody->eadatasize = ma->ma_lmm_size;
280 if (S_ISDIR(la->la_mode))
281 repbody->valid |= OBD_MD_FLDIREA;
283 repbody->valid |= OBD_MD_FLEASIZE;
285 } else if (S_ISLNK(la->la_mode) &&
286 reqbody->valid & OBD_MD_LINKNAME) {
287 rc = mo_readlink(ctxt, next, ma->ma_lmm, ma->ma_lmm_size);
289 CERROR("readlink failed: %d\n", rc);
292 repbody->valid |= OBD_MD_LINKNAME;
293 repbody->eadatasize = rc + 1;
294 ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
295 CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
296 (char*)ma->ma_lmm, rc);
301 if (reqbody->valid & OBD_MD_FLMODEASIZE) {
302 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
303 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
304 repbody->valid |= OBD_MD_FLMODEASIZE;
305 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
306 "MAX_COOKIE to : %d:%d\n",
308 repbody->max_cookiesize);
311 #ifdef CONFIG_FS_POSIX_ACL
312 if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
313 (reqbody->valid & OBD_MD_FLACL)) {
314 buffer = req_capsule_server_get(pill, &RMF_ACL);
315 length = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
317 rc = mo_xattr_get(ctxt, next, buffer,
318 length, XATTR_NAME_ACL_ACCESS);
320 if (rc == -ENODATA || rc == -EOPNOTSUPP)
323 CERROR("got acl size: %d\n", rc);
325 repbody->aclsize = rc;
326 repbody->valid |= OBD_MD_FLACL;
335 static int mdt_getattr(struct mdt_thread_info *info)
338 struct mdt_object *obj;
340 obj = info->mti_object;
341 LASSERT(obj != NULL);
342 LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
345 result = mdt_getattr_internal(info, obj);
346 mdt_shrink_reply(info, REPLY_REC_OFF + 1);
351 * UPDATE lock should be taken against parent, and be release before exit;
352 * child_bits lock should be taken against child, and be returned back:
353 * (1)normal request should release the child lock;
354 * (2)intent request will grant the lock to client.
356 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
357 struct mdt_lock_handle *lhc,
359 struct ldlm_reply *ldlm_rep)
361 struct mdt_object *parent = info->mti_object;
362 struct mdt_object *child;
363 struct md_object *next = mdt_object_child(info->mti_object);
364 struct lu_fid *child_fid = &info->mti_tmp_fid1;
367 struct mdt_lock_handle *lhp;
370 LASSERT(info->mti_object != NULL);
371 name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
375 CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
376 PFID(mdt_object_fid(parent)), name, ldlm_rep);
378 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
379 if (strlen(name) == 0) {
380 /* only getattr on the child. parent is on another node. */
381 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
383 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
385 PFID(mdt_object_fid(child)), ldlm_rep);
387 mdt_lock_handle_init(lhc);
388 lhc->mlh_mode = LCK_CR;
389 result = mdt_object_lock(info, child, lhc, child_bits);
391 /* finally, we can get attr for child. */
392 result = mdt_getattr_internal(info, child);
394 mdt_object_unlock(info, child, lhc, 1);
399 /*step 1: lock parent */
400 lhp = &info->mti_lh[MDT_LH_PARENT];
401 lhp->mlh_mode = LCK_CR;
402 result = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
406 /*step 2: lookup child's fid by name */
407 result = mdo_lookup(info->mti_ctxt, next, name, child_fid);
409 if (result == -ENOENT)
410 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
411 GOTO(out_parent, result);
413 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
415 *step 3: find the child object by fid & lock it.
416 * regardless if it is local or remote.
418 mdt_lock_handle_init(lhc);
419 lhc->mlh_mode = LCK_CR;
420 child = mdt_object_find_lock(info, child_fid, lhc, child_bits);
422 GOTO(out_parent, result = PTR_ERR(child));
424 /* finally, we can get attr for child. */
425 result = mdt_getattr_internal(info, child);
427 mdt_object_unlock(info, child, lhc, 1);
429 /* This is pure debugging code. */
430 struct ldlm_lock *lock;
431 struct ldlm_res_id *res_id;
432 lock = ldlm_handle2lock(&lhc->mlh_lh);
434 res_id = &lock->l_resource->lr_name;
435 LDLM_DEBUG(lock, "we will return this lock client\n");
436 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
437 &lock->l_resource->lr_name),
438 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
439 (unsigned long)res_id->name[0],
440 (unsigned long)res_id->name[1],
441 (unsigned long)res_id->name[2],
442 PFID(mdt_object_fid(child)));
446 mdt_object_put(info->mti_ctxt, child);
450 mdt_object_unlock(info, parent, lhp, 1);
455 /* normal handler: should release the child lock */
456 static int mdt_getattr_name(struct mdt_thread_info *info)
458 struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
463 rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
464 if (lustre_handle_is_used(&lhc->mlh_lh)) {
465 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
466 lhc->mlh_lh.cookie = 0;
468 mdt_shrink_reply(info, REPLY_REC_OFF + 1);
472 static struct lu_device_operations mdt_lu_ops;
474 static int lu_device_is_mdt(struct lu_device *d)
476 return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
479 static struct mdt_device *mdt_dev(struct lu_device *d)
481 LASSERT(lu_device_is_mdt(d));
482 return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
485 static int mdt_connect(struct mdt_thread_info *info)
488 struct ptlrpc_request *req;
490 req = mdt_info_req(info);
491 result = target_handle_connect(req, mdt_regular_handle);
493 LASSERT(req->rq_export != NULL);
494 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
499 static int mdt_disconnect(struct mdt_thread_info *info)
501 return target_handle_disconnect(mdt_info_req(info));
504 static int mdt_sendpage(struct mdt_thread_info *info,
505 struct lu_rdpg *rdpg)
507 struct ptlrpc_request *req = mdt_info_req(info);
508 struct ptlrpc_bulk_desc *desc;
509 struct l_wait_info *lwi = &info->mti_u.rdpg.mti_wait_info;
516 desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
519 GOTO(out, rc = -ENOMEM);
521 for (i = 0, tmpcount = rdpg->rp_count;
522 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
523 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
524 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
527 LASSERT(desc->bd_nob == rdpg->rp_count);
528 rc = ptlrpc_start_bulk_transfer(desc);
532 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
533 GOTO(abort_bulk, rc);
535 *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
536 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
537 LASSERT (rc == 0 || rc == -ETIMEDOUT);
540 if (desc->bd_success &&
541 desc->bd_nob_transferred == rdpg->rp_count)
544 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
547 DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
548 (rc == -ETIMEDOUT) ? "timeout" : "network error",
549 desc->bd_nob_transferred, rdpg->rp_count,
550 req->rq_export->exp_client_uuid.uuid,
551 req->rq_export->exp_connection->c_remote_uuid.uuid);
553 class_fail_export(req->rq_export);
557 ptlrpc_abort_bulk(desc);
559 ptlrpc_free_bulk(desc);
564 static int mdt_readpage(struct mdt_thread_info *info)
566 struct mdt_object *object = info->mti_object;
567 struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
568 struct mdt_body *reqbody;
569 struct mdt_body *repbody;
574 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
577 reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
578 repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
579 if (reqbody == NULL || repbody == NULL)
583 * prepare @rdpg before calling lower layers and transfer itself. Here
584 * reqbody->size contains offset of where to start to read and
585 * reqbody->nlink contains number bytes to read.
587 rdpg->rp_hash = reqbody->size;
588 rdpg->rp_hash_end = ~0ul;
589 if ((__u64)rdpg->rp_hash != reqbody->size) {
590 CERROR("Invalid hash: %#llx != %#llx\n",
591 (__u64)rdpg->rp_hash, reqbody->size);
594 rdpg->rp_count = reqbody->nlink;
595 rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
596 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
597 if (rdpg->rp_pages == NULL)
600 for (i = 0; i < rdpg->rp_npages; ++i) {
601 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
602 if (rdpg->rp_pages[i] == NULL)
603 GOTO(free_rdpg, rc = -ENOMEM);
606 /* call lower layers to fill allocated pages with directory data */
607 rc = mo_readpage(info->mti_ctxt, mdt_object_child(object), rdpg);
611 /* send pages to client */
612 rc = mdt_sendpage(info, rdpg);
616 for (i = 0; i < rdpg->rp_npages; i++)
617 if (rdpg->rp_pages[i] != NULL)
618 __free_pages(rdpg->rp_pages[i], 0);
619 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
621 MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
626 static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op)
628 struct req_capsule *pill = &info->mti_pill;
629 struct mdt_device *mdt = info->mti_mdt;
630 struct ptlrpc_request *req = mdt_info_req(info);
634 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
637 rc = mdt_reint_unpack(info, op);
642 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
643 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
644 mdt->mdt_max_mdsize);
645 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
646 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
647 mdt->mdt_max_cookiesize);
648 rc = req_capsule_pack(pill);
652 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
653 struct mdt_client_data *mcd;
655 mcd = req->rq_export->exp_mdt_data.med_mcd;
656 if (mcd->mcd_last_xid == req->rq_xid) {
657 mdt_reconstruct(info);
658 RETURN(lustre_msg_get_status(req->rq_repmsg));
660 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
663 rc = mdt_reint_rec(info);
668 static long mdt_reint_opcode(struct mdt_thread_info *info,
669 const struct req_format **fmt)
675 ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
678 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
679 if (opc < REINT_MAX && fmt[opc] != NULL)
680 req_capsule_extend(&info->mti_pill, fmt[opc]);
682 CERROR("Unsupported opc: %ld\n", opc);
687 static int mdt_reint(struct mdt_thread_info *info)
692 static const struct req_format *reint_fmts[REINT_MAX] = {
693 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
694 [REINT_CREATE] = &RQF_MDS_REINT_CREATE,
695 [REINT_LINK] = &RQF_MDS_REINT_LINK,
696 [REINT_UNLINK] = &RQF_MDS_REINT_UNLINK,
697 [REINT_RENAME] = &RQF_MDS_REINT_RENAME,
698 [REINT_OPEN] = &RQF_MDS_REINT_OPEN
703 opc = mdt_reint_opcode(info, reint_fmts);
705 rc = mdt_reint_internal(info, opc);
709 info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
713 /* TODO these two methods not available now. */
715 /* this should sync the whole device */
716 static int mdt_device_sync(struct mdt_thread_info *info)
721 /* this should sync this object */
722 static int mdt_object_sync(struct mdt_thread_info *info)
727 static int mdt_sync(struct mdt_thread_info *info)
729 struct req_capsule *pill = &info->mti_pill;
730 struct mdt_body *body;
734 /* The fid may be zero, so we req_capsule_set manually */
735 req_capsule_set(pill, &RQF_MDS_SYNC);
737 body = req_capsule_client_get(pill, &RMF_MDT_BODY);
741 if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
744 if (fid_seq(&body->fid1) == 0) {
745 /* sync the whole device */
746 rc = req_capsule_pack(pill);
748 rc = mdt_device_sync(info);
751 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
753 rc = mdt_object_sync(info);
755 struct md_object *next;
756 const struct lu_fid *fid;
759 next = mdt_object_child(info->mti_object);
760 fid = mdt_object_fid(info->mti_object);
761 info->mti_attr.ma_need = MA_INODE;
762 rc = mo_attr_get(info->mti_ctxt, next,
764 la = &info->mti_attr.ma_attr;
766 body = req_capsule_server_get(pill,
768 mdt_pack_attr2body(body, la, fid);
776 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
781 static int mdt_quotactl_handle(struct mdt_thread_info *info)
787 * OBD PING and other handlers.
789 static int mdt_obd_ping(struct mdt_thread_info *info)
793 result = target_handle_ping(mdt_info_req(info));
797 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
802 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
812 static struct ldlm_callback_suite cbs = {
813 .lcs_completion = ldlm_server_completion_ast,
814 .lcs_blocking = ldlm_server_blocking_ast,
818 static int mdt_enqueue(struct mdt_thread_info *info)
821 struct ptlrpc_request *req;
824 * info->mti_dlm_req already contains swapped and (if necessary)
825 * converted dlm request.
827 LASSERT(info->mti_dlm_req != NULL);
829 req = mdt_info_req(info);
830 info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
831 result = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
832 req, info->mti_dlm_req, &cbs);
833 return result ? : req->rq_status;
836 static int mdt_convert(struct mdt_thread_info *info)
839 struct ptlrpc_request *req;
841 LASSERT(info->mti_dlm_req);
842 req = mdt_info_req(info);
843 result = ldlm_handle_convert0(req, info->mti_dlm_req);
844 return result ? : req->rq_status;
847 static int mdt_bl_callback(struct mdt_thread_info *info)
849 CERROR("bl callbacks should not happen on MDS\n");
854 static int mdt_cp_callback(struct mdt_thread_info *info)
856 CERROR("cp callbacks should not happen on MDS\n");
862 * Build (DLM) resource name from fid.
864 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
865 struct ldlm_res_id *name)
867 memset(name, 0, sizeof *name);
868 name->name[0] = fid_seq(f);
869 name->name[1] = fid_oid(f);
870 name->name[2] = fid_ver(f);
874 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
875 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
876 struct lustre_handle *lh, ldlm_mode_t mode,
877 ldlm_policy_data_t *policy,
878 struct ldlm_res_id *res_id)
880 int flags = 0; /*XXX: LDLM_FL_LOCAL_ONLY?*/
887 rc = ldlm_cli_enqueue_local(ns, *fid_build_res_name(f, res_id),
888 LDLM_IBITS, policy, mode, &flags,
889 ldlm_blocking_ast, ldlm_completion_ast,
890 NULL, NULL, 0, NULL, lh);
891 return rc == ELDLM_OK ? 0 : -EIO;
894 /* just call ldlm_lock_decref() if decref,
895 * else we only call ptlrpc_save_lock() to save this lock in req.
896 * when transaction committed, req will be released, and lock will, too */
897 void fid_unlock(struct ptlrpc_request *req, const struct lu_fid *f,
898 struct lustre_handle *lh, ldlm_mode_t mode, int decref)
901 /* FIXME: this is debug stuff, remove it later. */
902 struct ldlm_lock *lock = ldlm_handle2lock(lh);
904 CERROR("invalid lock handle "LPX64, lh->cookie);
907 LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
911 ldlm_lock_decref(lh, mode);
913 ptlrpc_save_lock(req, lh, mode);
916 static struct mdt_object *mdt_obj(struct lu_object *o)
918 LASSERT(lu_device_is_mdt(o->lo_dev));
919 return container_of0(o, struct mdt_object, mot_obj.mo_lu);
922 struct mdt_object *mdt_object_find(const struct lu_context *ctxt,
923 struct mdt_device *d,
924 const struct lu_fid *f)
927 struct mdt_object *m;
930 o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
932 m = (struct mdt_object *)o;
938 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
939 struct mdt_lock_handle *lh, __u64 ibits)
941 ldlm_policy_data_t *policy = &info->mti_policy;
942 struct ldlm_res_id *res_id = &info->mti_res_id;
943 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
947 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
948 LASSERT(lh->mlh_mode != LCK_MINMODE);
950 policy->l_inodebits.bits = ibits;
952 rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
957 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
958 struct mdt_lock_handle *lh, int decref)
960 struct ptlrpc_request *req = mdt_info_req(info);
963 if (lustre_handle_is_used(&lh->mlh_lh)) {
964 fid_unlock(req, mdt_object_fid(o),
965 &lh->mlh_lh, lh->mlh_mode, decref);
966 lh->mlh_lh.cookie = 0;
971 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
972 const struct lu_fid *f,
973 struct mdt_lock_handle *lh,
976 struct mdt_object *o;
978 o = mdt_object_find(info->mti_ctxt, info->mti_mdt, f);
982 result = mdt_object_lock(info, o, lh, ibits);
984 mdt_object_put(info->mti_ctxt, o);
991 void mdt_object_unlock_put(struct mdt_thread_info * info,
992 struct mdt_object * o,
993 struct mdt_lock_handle *lh,
996 mdt_object_unlock(info, o, lh, decref);
997 mdt_object_put(info->mti_ctxt, o);
1000 static struct mdt_handler *mdt_handler_find(__u32 opc,
1001 struct mdt_opc_slice *supported)
1003 struct mdt_opc_slice *s;
1004 struct mdt_handler *h;
1007 for (s = supported; s->mos_hs != NULL; s++) {
1008 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1009 h = s->mos_hs + (opc - s->mos_opc_start);
1011 LASSERT(h->mh_opc == opc);
1013 h = NULL; /* unsupported opc */
1020 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1022 return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1025 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1027 return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1030 static int mdt_lock_resname_compat(struct mdt_device *m,
1031 struct ldlm_request *req)
1033 /* XXX something... later. */
1037 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1039 /* XXX something... later. */
1044 * Generic code handling requests that have struct mdt_body passed in:
1046 * - extract mdt_body from request and save it in @info, if present;
1048 * - create lu_object, corresponding to the fid in mdt_body, and save it in
1051 * - if HABEO_CORPUS flag is set for this request type check whether object
1052 * actually exists on storage (lu_object_exists()).
1055 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1057 const struct mdt_body *body;
1058 struct mdt_object *obj;
1059 const struct lu_context *ctx;
1060 struct req_capsule *pill;
1063 ctx = info->mti_ctxt;
1064 pill = &info->mti_pill;
1066 body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1068 if (fid_is_sane(&body->fid1)) {
1069 obj = mdt_object_find(ctx, info->mti_mdt, &body->fid1);
1071 if ((flags & HABEO_CORPUS) &&
1072 !lu_object_exists(&obj->mot_obj.mo_lu)) {
1073 mdt_object_put(ctx, obj);
1076 info->mti_object = obj;
1080 result = PTR_ERR(obj);
1082 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1090 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1092 struct req_capsule *pill;
1096 pill = &info->mti_pill;
1098 if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1099 result = mdt_body_unpack(info, flags);
1103 if (result == 0 && (flags & HABEO_REFERO)) {
1104 struct mdt_device *mdt = info->mti_mdt;
1106 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1107 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1108 mdt->mdt_max_mdsize);
1109 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1110 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1111 mdt->mdt_max_cookiesize);
1113 result = req_capsule_pack(pill);
1118 struct lu_context_key mdt_txn_key;
1120 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1122 struct mdt_device *mdt = info->mti_mdt;
1123 struct ptlrpc_request *req = mdt_info_req(info);
1124 struct obd_export *exp = req->rq_export;
1126 /* sometimes the reply message has not been successfully packed */
1127 if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1130 if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1133 /*XXX: assert on this when all code will be finished */
1134 if (rc != 0 && info->mti_transno != 0) {
1135 info->mti_transno = 0;
1136 CERROR("Transno is not 0 while rc is %i!\n", rc);
1139 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1140 info->mti_transno, exp->exp_obd->obd_last_committed);
1142 spin_lock(&mdt->mdt_transno_lock);
1143 req->rq_transno = info->mti_transno;
1144 lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1146 target_committed_to_req(req);
1148 spin_unlock(&mdt->mdt_transno_lock);
1149 lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1150 //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1154 * Invoke handler for this request opc. Also do necessary preprocessing
1155 * (according to handler ->mh_flags), and post-processing (setting of
1156 * ->last_{xid,committed}).
1158 static int mdt_req_handle(struct mdt_thread_info *info,
1159 struct mdt_handler *h, struct ptlrpc_request *req)
1166 LASSERT(h->mh_act != NULL);
1167 LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1168 LASSERT(current->journal_info == NULL);
1170 DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1172 if (h->mh_fail_id != 0)
1173 MDT_FAIL_RETURN(h->mh_fail_id, 0);
1176 flags = h->mh_flags;
1177 LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1179 if (h->mh_fmt != NULL) {
1180 req_capsule_set(&info->mti_pill, h->mh_fmt);
1181 result = mdt_unpack_req_pack_rep(info, flags);
1184 if (result == 0 && flags & MUTABOR &&
1185 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1188 if (result == 0 && flags & HABEO_CLAVIS) {
1189 struct ldlm_request *dlm_req;
1191 LASSERT(h->mh_fmt != NULL);
1193 dlm_req = req_capsule_client_get(&info->mti_pill,&RMF_DLM_REQ);
1194 if (dlm_req != NULL) {
1195 if (info->mti_mdt->mdt_opts.mo_compat_resname)
1196 result = mdt_lock_resname_compat(info->mti_mdt,
1198 info->mti_dlm_req = dlm_req;
1200 CERROR("Can't unpack dlm request\n");
1209 result = h->mh_act(info);
1211 * XXX result value is unconditionally shoved into ->rq_status
1212 * (original code sometimes placed error code into ->rq_status, and
1213 * sometimes returned it to the
1214 * caller). ptlrpc_server_handle_request() doesn't check return value
1217 req->rq_status = result;
1219 LASSERT(current->journal_info == NULL);
1221 if (flags & HABEO_CLAVIS && info->mti_mdt->mdt_opts.mo_compat_resname){
1222 struct ldlm_reply *dlmrep;
1224 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1226 result = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1229 /* If we're DISCONNECTing, the mdt_export_data is already freed */
1231 if (h->mh_opc != MDS_DISCONNECT &&
1232 h->mh_opc != MDS_READPAGE &&
1233 h->mh_opc != LDLM_ENQUEUE) {
1234 mdt_finish_reply(info, req->rq_status);
1240 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1242 lh->mlh_lh.cookie = 0ull;
1243 lh->mlh_mode = LCK_MINMODE;
1246 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1248 LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1251 static void mdt_thread_info_init(struct ptlrpc_request *req,
1252 struct mdt_thread_info *info)
1256 memset(info, 0, sizeof(*info));
1258 info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1259 for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1260 info->mti_rep_buf_size[i] = -1;
1262 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1263 mdt_lock_handle_init(&info->mti_lh[i]);
1265 info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1266 info->mti_ctxt = req->rq_svc_thread->t_ctx;
1267 info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1268 /* it can be NULL while CONNECT */
1270 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1271 req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1272 info->mti_rep_buf_size);
1275 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1279 req_capsule_fini(&info->mti_pill);
1280 if (info->mti_object != NULL) {
1281 mdt_object_put(info->mti_ctxt, info->mti_object);
1282 info->mti_object = NULL;
1284 for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1285 mdt_lock_handle_fini(&info->mti_lh[i]);
1289 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1290 struct obd_device *obd, int *process);
1292 * Handle recovery. Return:
1293 * +1: continue request processing;
1294 * -ve: abort immediately with the given error code;
1295 * 0: send reply with error code in req->rq_status;
1297 static int mdt_recovery(struct ptlrpc_request *req)
1301 struct obd_device *obd;
1305 if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CONNECT)
1308 if (req->rq_export == NULL) {
1309 CERROR("operation %d on unconnected MDS from %s\n",
1310 lustre_msg_get_opc(req->rq_reqmsg),
1311 libcfs_id2str(req->rq_peer));
1312 req->rq_status = -ENOTCONN;
1316 /* sanity check: if the xid matches, the request must be marked as a
1317 * resent or replayed */
1318 LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req) ||
1319 req->rq_xid == req_exp_last_close_xid(req),
1320 lustre_msg_get_flags(req->rq_reqmsg) &
1321 (MSG_RESENT | MSG_REPLAY)),
1322 "rq_xid "LPU64" matches last_xid, "
1323 "expected RESENT flag\n", req->rq_xid);
1325 /* else: note the opposite is not always true; a RESENT req after a
1326 * failover will usually not match the last_xid, since it was likely
1327 * never committed. A REPLAYed request will almost never match the
1328 * last xid, however it could for a committed, but still retained,
1331 obd = req->rq_export->exp_obd;
1333 /* Check for aborted recovery... */
1334 spin_lock_bh(&obd->obd_processing_task_lock);
1335 abort_recovery = obd->obd_abort_recovery;
1336 recovering = obd->obd_recovering;
1337 spin_unlock_bh(&obd->obd_processing_task_lock);
1338 if (abort_recovery) {
1339 target_abort_recovery(obd);
1340 } else if (recovering) {
1344 rc = mds_filter_recovery_request(req, obd, &should_process);
1345 if (rc != 0 || !should_process) {
1352 static int mdt_reply(struct ptlrpc_request *req, int result,
1353 struct mdt_thread_info *info)
1355 struct obd_device *obd;
1358 if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1359 if (lustre_msg_get_opc(req->rq_reqmsg) != OBD_PING)
1360 DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
1362 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
1363 if (obd && obd->obd_recovering) {
1364 DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1365 RETURN(target_queue_final_reply(req, result));
1367 /* Lost a race with recovery; let the error path
1369 result = req->rq_status = -ENOTCONN;
1372 target_send_reply(req, result, info->mti_fail_id);
1377 extern int mds_msg_check_version(struct lustre_msg *msg);
1379 static int mdt_handle0(struct ptlrpc_request *req,
1380 struct mdt_thread_info *info,
1381 struct mdt_opc_slice *supported)
1383 struct mdt_handler *h;
1384 struct lustre_msg *msg;
1389 MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1391 LASSERT(current->journal_info == NULL);
1393 msg = req->rq_reqmsg;
1394 result = mds_msg_check_version(msg);
1396 result = mdt_recovery(req);
1399 h = mdt_handler_find(lustre_msg_get_opc(msg),
1402 result = mdt_req_handle(info, h, req);
1404 req->rq_status = -ENOTSUPP;
1405 result = ptlrpc_error(req);
1410 result = mdt_reply(req, result, info);
1413 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
1418 * MDT handler function called by ptlrpc service thread when request comes.
1420 * XXX common "target" functionality should be factored into separate module
1421 * shared by mdt, ost and stand-alone services like fld.
1423 static int mdt_handle_common(struct ptlrpc_request *req,
1424 struct mdt_opc_slice *supported)
1426 struct lu_context *ctx;
1427 struct mdt_thread_info *info;
1431 ctx = req->rq_svc_thread->t_ctx;
1432 LASSERT(ctx != NULL);
1433 LASSERT(ctx->lc_thread == req->rq_svc_thread);
1434 info = lu_context_key_get(ctx, &mdt_thread_key);
1435 LASSERT(info != NULL);
1437 mdt_thread_info_init(req, info);
1439 result = mdt_handle0(req, info, supported);
1441 mdt_thread_info_fini(info);
1445 static int mdt_regular_handle(struct ptlrpc_request *req)
1447 return mdt_handle_common(req, mdt_regular_handlers);
1450 static int mdt_readpage_handle(struct ptlrpc_request *req)
1452 return mdt_handle_common(req, mdt_readpage_handlers);
1468 static int mdt_intent_getattr(enum mdt_it_code opcode,
1469 struct mdt_thread_info *info,
1470 struct ldlm_lock **,
1472 static int mdt_intent_reint(enum mdt_it_code opcode,
1473 struct mdt_thread_info *info,
1474 struct ldlm_lock **,
1477 static struct mdt_it_flavor {
1478 const struct req_format *it_fmt;
1480 int (*it_act)(enum mdt_it_code ,
1481 struct mdt_thread_info *,
1482 struct ldlm_lock **,
1485 } mdt_it_flavor[] = {
1487 .it_fmt = &RQF_LDLM_INTENT,
1488 /*.it_flags = HABEO_REFERO,*/
1490 .it_act = mdt_intent_reint,
1491 .it_reint = REINT_OPEN
1494 .it_fmt = &RQF_LDLM_INTENT,
1495 .it_flags = MUTABOR,
1496 .it_act = mdt_intent_reint,
1497 .it_reint = REINT_OPEN
1500 .it_fmt = &RQF_LDLM_INTENT,
1501 .it_flags = MUTABOR,
1502 .it_act = mdt_intent_reint,
1503 .it_reint = REINT_CREATE
1505 [MDT_IT_GETATTR] = {
1506 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1507 .it_flags = HABEO_REFERO,
1508 .it_act = mdt_intent_getattr
1510 [MDT_IT_READDIR] = {
1516 .it_fmt = &RQF_LDLM_INTENT_GETATTR,
1517 .it_flags = HABEO_REFERO,
1518 .it_act = mdt_intent_getattr
1521 .it_fmt = &RQF_LDLM_INTENT_UNLINK,
1522 .it_flags = MUTABOR,
1523 .it_act = NULL, /* XXX can be mdt_intent_reint, ? */
1524 .it_reint = REINT_UNLINK
1528 .it_flags = MUTABOR,
1531 [MDT_IT_GETXATTR] = {
1538 static int mdt_intent_getattr(enum mdt_it_code opcode,
1539 struct mdt_thread_info *info,
1540 struct ldlm_lock **lockp,
1543 struct ldlm_lock *old_lock = *lockp;
1544 struct ldlm_lock *new_lock = NULL;
1545 struct ptlrpc_request *req = mdt_info_req(info);
1546 struct ldlm_reply *ldlm_rep;
1547 struct mdt_lock_handle tmp_lock;
1548 struct mdt_lock_handle *lhc = &tmp_lock;
1549 struct mdt_device *mdt = info->mti_mdt;
1557 child_bits = MDS_INODELOCK_LOOKUP;
1559 case MDT_IT_GETATTR:
1560 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1563 CERROR("Unhandled till now");
1567 ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1568 mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
1570 ldlm_rep->lock_policy_res2 =
1571 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
1572 mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1);
1574 if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
1575 ldlm_rep->lock_policy_res2 = 0;
1576 if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
1577 ldlm_rep->lock_policy_res2) {
1578 RETURN(ELDLM_LOCK_ABORTED);
1581 new_lock = ldlm_handle2lock(&lhc->mlh_lh);
1582 if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1585 LASSERTF(new_lock != NULL, "op %d lockh "LPX64"\n",
1586 opcode, lhc->mlh_lh.cookie);
1590 /* FIXME:This only happens when MDT can handle RESENT */
1591 if (new_lock->l_export == req->rq_export) {
1592 /* Already gave this to the client, which means that we
1593 * reconstructed a reply. */
1594 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1596 RETURN(ELDLM_LOCK_REPLACED);
1600 * These are copied from mds/hander.c, and should be factored into
1601 * ldlm module in order to share these code, and be easy for merge.
1604 /* Fixup the lock to be given to the client */
1605 lock_res_and_lock(new_lock);
1606 new_lock->l_readers = 0;
1607 new_lock->l_writers = 0;
1609 new_lock->l_export = class_export_get(req->rq_export);
1610 list_add(&new_lock->l_export_chain,
1611 &new_lock->l_export->exp_ldlm_data.led_held_locks);
1613 new_lock->l_blocking_ast = old_lock->l_blocking_ast;
1614 new_lock->l_completion_ast = old_lock->l_completion_ast;
1616 new_lock->l_remote_handle = old_lock->l_remote_handle;
1618 new_lock->l_flags &= ~LDLM_FL_LOCAL;
1620 unlock_res_and_lock(new_lock);
1621 LDLM_LOCK_PUT(new_lock);
1623 RETURN(ELDLM_LOCK_REPLACED);
1626 static int mdt_intent_reint(enum mdt_it_code opcode,
1627 struct mdt_thread_info *info,
1628 struct ldlm_lock **lockp,
1633 struct ldlm_reply *rep;
1635 static const struct req_format *intent_fmts[REINT_MAX] = {
1636 [REINT_CREATE] = &RQF_LDLM_INTENT_CREATE,
1637 [REINT_OPEN] = &RQF_LDLM_INTENT_OPEN
1642 opc = mdt_reint_opcode(info, intent_fmts);
1646 if (mdt_it_flavor[opcode].it_reint != opc) {
1647 CERROR("Reint code %ld doesn't match intent: %d\n",
1652 rc = mdt_reint_internal(info, opc);
1654 rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1657 rep->lock_policy_res2 = rc;
1659 mdt_set_disposition(info, rep, DISP_IT_EXECD);
1661 mdt_finish_reply(info, rc);
1663 RETURN(ELDLM_LOCK_ABORTED);
1666 static int mdt_intent_code(long itcode)
1672 result = MDT_IT_OPEN;
1674 case IT_OPEN|IT_CREAT:
1675 result = MDT_IT_OCREAT;
1678 result = MDT_IT_CREATE;
1681 result = MDT_IT_READDIR;
1684 result = MDT_IT_GETATTR;
1687 result = MDT_IT_LOOKUP;
1690 result = MDT_IT_UNLINK;
1693 result = MDT_IT_TRUNC;
1696 result = MDT_IT_GETXATTR;
1699 CERROR("Unknown intent opcode: %ld\n", itcode);
1706 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
1707 struct ldlm_lock **lockp, int flags)
1709 struct req_capsule *pill;
1710 struct mdt_it_flavor *flv;
1715 opc = mdt_intent_code(itopc);
1719 pill = &info->mti_pill;
1720 flv = &mdt_it_flavor[opc];
1722 if (flv->it_fmt != NULL)
1723 req_capsule_extend(pill, flv->it_fmt);
1725 rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
1727 struct ptlrpc_request *req = mdt_info_req(info);
1728 if (flv->it_flags & MUTABOR &&
1729 req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1732 if (rc == 0 && flv->it_act != NULL) {
1733 /* execute policy */
1734 rc = flv->it_act(opc, info, lockp, flags);
1740 static int mdt_intent_policy(struct ldlm_namespace *ns,
1741 struct ldlm_lock **lockp, void *req_cookie,
1742 ldlm_mode_t mode, int flags, void *data)
1744 struct mdt_thread_info *info;
1745 struct ptlrpc_request *req = req_cookie;
1746 struct ldlm_intent *it;
1747 struct req_capsule *pill;
1748 struct ldlm_lock *lock = *lockp;
1753 LASSERT(req != NULL);
1755 info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1756 LASSERT(info != NULL);
1757 pill = &info->mti_pill;
1758 LASSERT(pill->rc_req == req);
1760 if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
1761 req_capsule_extend(pill, &RQF_LDLM_INTENT);
1762 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
1764 LDLM_DEBUG(lock, "intent policy opc: %s",
1765 ldlm_it2str(it->opc));
1767 rc = mdt_intent_opc(it->opc, info, lockp, flags);
1773 /* No intent was provided */
1774 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
1775 rc = req_capsule_pack(pill);
1783 static int mdt_seq_fini(const struct lu_context *ctx,
1784 struct mdt_device *m)
1786 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1789 if (ls && ls->ls_server_seq) {
1790 seq_server_fini(ls->ls_server_seq, ctx);
1791 OBD_FREE_PTR(ls->ls_server_seq);
1792 ls->ls_server_seq = NULL;
1794 if (ls && ls->ls_control_seq) {
1795 seq_server_fini(ls->ls_control_seq, ctx);
1796 OBD_FREE_PTR(ls->ls_control_seq);
1797 ls->ls_control_seq = NULL;
1802 static int mdt_seq_init(const struct lu_context *ctx,
1804 struct mdt_device *m)
1810 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1812 /* sequence-controller node */
1813 if (ls->ls_node_id == 0) {
1814 LASSERT(ls->ls_control_seq == NULL);
1815 OBD_ALLOC_PTR(ls->ls_control_seq);
1817 if (ls->ls_control_seq != NULL) {
1818 rc = seq_server_init(ls->ls_control_seq,
1819 m->mdt_bottom, uuid,
1820 LUSTRE_SEQ_CONTROLLER,
1826 LASSERT(ls->ls_server_seq == NULL);
1827 OBD_ALLOC_PTR(ls->ls_server_seq);
1829 if (ls->ls_server_seq != NULL) {
1830 rc = seq_server_init(ls->ls_server_seq,
1831 m->mdt_bottom, uuid,
1838 mdt_seq_fini(ctx, m);
1844 * Init client sequence manager which is used by local MDS to talk to sequence
1845 * controller on remote node.
1847 static int mdt_seq_init_cli(const struct lu_context *ctx,
1848 struct mdt_device *m,
1849 struct lustre_cfg *cfg)
1851 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1852 struct obd_device *mdc;
1853 struct obd_uuid *uuidp;
1857 struct mdt_thread_info *info;
1858 char *p, *index_string = lustre_cfg_string(cfg, 2);
1861 info = lu_context_key_get(ctx, &mdt_thread_key);
1862 uuidp = &info->mti_u.uuid;
1864 LASSERT(index_string);
1866 index = simple_strtol(index_string, &p, 10);
1868 CERROR("Invalid index in lustre_cgf, offset 2\n");
1872 /* check if this is first MDC add and controller is not yet
1874 if (index != 0 || ls->ls_client_exp)
1877 uuid_str = lustre_cfg_string(cfg, 1);
1878 obd_str2uuid(uuidp, uuid_str);
1879 mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, NULL);
1881 CERROR("can't find controller MDC by uuid %s\n",
1884 } else if (!mdc->obd_set_up) {
1885 CERROR("target %s not set up\n", mdc->obd_name);
1888 struct lustre_handle conn = {0, };
1890 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
1891 mdc->obd_name, mdc->obd_uuid.uuid);
1893 rc = obd_connect(ctx, &conn, mdc, &mdc->obd_uuid, NULL);
1896 CERROR("target %s connect error %d\n",
1899 ls->ls_client_exp = class_conn2export(&conn);
1901 OBD_ALLOC_PTR(ls->ls_client_seq);
1903 if (ls->ls_client_seq != NULL) {
1904 rc = seq_client_init(ls->ls_client_seq,
1907 LUSTRE_SEQ_METADATA);
1913 /*FIXME: add client seq to mdc obd for
1914 *allocating fid in create slave objects,
1915 *may need better way to fix it,
1916 *why not init client seq in cmm_add_mdc?*/
1917 mdc->u.cli.cl_seq = ls->ls_client_seq;
1919 LASSERT(ls->ls_server_seq != NULL);
1921 rc = seq_server_set_cli(ls->ls_server_seq,
1930 static void mdt_seq_fini_cli(struct mdt_device *m)
1936 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1938 if (ls && ls->ls_server_seq)
1939 seq_server_set_cli(ls->ls_server_seq,
1942 if (ls && ls->ls_client_seq) {
1943 seq_client_fini(ls->ls_client_seq);
1944 OBD_FREE_PTR(ls->ls_client_seq);
1945 ls->ls_client_seq = NULL;
1948 if (ls && ls->ls_client_exp) {
1949 int rc = obd_disconnect(ls->ls_client_exp);
1950 ls->ls_client_exp = NULL;
1953 CERROR("failure to disconnect "
1963 static int mdt_fld_init(const struct lu_context *ctx,
1965 struct mdt_device *m)
1971 ls = m->mdt_md_dev.md_lu_dev.ld_site;
1973 OBD_ALLOC_PTR(ls->ls_server_fld);
1975 if (ls->ls_server_fld != NULL) {
1976 rc = fld_server_init(ls->ls_server_fld, ctx,
1977 m->mdt_bottom, uuid);
1979 OBD_FREE_PTR(ls->ls_server_fld);
1980 ls->ls_server_fld = NULL;
1988 static int mdt_fld_fini(const struct lu_context *ctx,
1989 struct mdt_device *m)
1991 struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1994 if (ls && ls->ls_server_fld) {
1995 fld_server_fini(ls->ls_server_fld, ctx);
1996 OBD_FREE_PTR(ls->ls_server_fld);
1997 ls->ls_server_fld = NULL;
2002 /* device init/fini methods */
2003 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2005 if (m->mdt_service != NULL) {
2006 ptlrpc_unregister_service(m->mdt_service);
2007 m->mdt_service = NULL;
2009 if (m->mdt_readpage_service != NULL) {
2010 ptlrpc_unregister_service(m->mdt_readpage_service);
2011 m->mdt_readpage_service = NULL;
2013 if (m->mdt_setattr_service != NULL) {
2014 ptlrpc_unregister_service(m->mdt_setattr_service);
2015 m->mdt_setattr_service = NULL;
2019 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2022 static struct ptlrpc_service_conf conf;
2025 conf = (typeof(conf)) {
2026 .psc_nbufs = MDS_NBUFS,
2027 .psc_bufsize = MDS_BUFSIZE,
2028 .psc_max_req_size = MDS_MAXREQSIZE,
2029 .psc_max_reply_size = MDS_MAXREPSIZE,
2030 .psc_req_portal = MDS_REQUEST_PORTAL,
2031 .psc_rep_portal = MDC_REPLY_PORTAL,
2032 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2034 * We'd like to have a mechanism to set this on a per-device
2035 * basis, but alas...
2037 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2039 .psc_ctx_tags = LCT_MD_THREAD
2042 m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2043 ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2044 "mdt_ldlm_client", m->mdt_ldlm_client);
2047 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT0_NAME,
2048 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2050 if (m->mdt_service == NULL)
2053 rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
2055 GOTO(err_mdt_svc, rc);
2058 * readpage service configuration. Parameters have to be adjusted,
2061 conf = (typeof(conf)) {
2062 .psc_nbufs = MDS_NBUFS,
2063 .psc_bufsize = MDS_BUFSIZE,
2064 .psc_max_req_size = MDS_MAXREQSIZE,
2065 .psc_max_reply_size = MDS_MAXREPSIZE,
2066 .psc_req_portal = MDS_READPAGE_PORTAL,
2067 .psc_rep_portal = MDC_REPLY_PORTAL,
2068 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2069 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2071 .psc_ctx_tags = LCT_MD_THREAD
2073 m->mdt_readpage_service =
2074 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2075 LUSTRE_MDT0_NAME "_readpage",
2076 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2079 if (m->mdt_readpage_service == NULL) {
2080 CERROR("failed to start readpage service\n");
2081 GOTO(err_mdt_svc, rc = -ENOMEM);
2084 rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2087 * setattr service configuration.
2089 conf = (typeof(conf)) {
2090 .psc_nbufs = MDS_NBUFS,
2091 .psc_bufsize = MDS_BUFSIZE,
2092 .psc_max_req_size = MDS_MAXREQSIZE,
2093 .psc_max_reply_size = MDS_MAXREPSIZE,
2094 .psc_req_portal = MDS_SETATTR_PORTAL,
2095 .psc_rep_portal = MDC_REPLY_PORTAL,
2096 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2097 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
2099 .psc_ctx_tags = LCT_MD_THREAD
2102 m->mdt_setattr_service =
2103 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2104 LUSTRE_MDT0_NAME "_setattr",
2105 m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2108 if (!m->mdt_setattr_service) {
2109 CERROR("failed to start setattr service\n");
2110 GOTO(err_mdt_svc, rc = -ENOMEM);
2113 rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2115 GOTO(err_mdt_svc, rc);
2120 mdt_stop_ptlrpc_service(m);
2125 static void mdt_stack_fini(const struct lu_context *ctx,
2126 struct mdt_device *m, struct lu_device *top)
2128 struct lu_device *d = top, *n;
2129 struct lustre_cfg_bufs *bufs;
2130 struct lustre_cfg *lcfg;
2131 struct mdt_thread_info *info;
2134 info = lu_context_key_get(ctx, &mdt_thread_key);
2135 LASSERT(info != NULL);
2137 bufs = &info->mti_u.bufs;
2138 /* process cleanup */
2139 lustre_cfg_bufs_reset(bufs, NULL);
2140 lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
2142 CERROR("Cannot alloc lcfg!\n");
2145 top->ld_ops->ldo_process_config(ctx, top, lcfg);
2147 lu_site_purge(ctx, top->ld_site, ~0);
2149 struct obd_type *type;
2150 struct lu_device_type *ldt = d->ld_type;
2154 /* each fini() returns next device in stack of layers
2155 * * so we can avoid the recursion */
2156 n = ldt->ldt_ops->ldto_device_fini(ctx, d);
2157 ldt->ldt_ops->ldto_device_free(ctx, d);
2158 type = ldt->ldt_obd_type;
2160 class_put_type(type);
2161 /* switch to the next device in the layer */
2164 m->mdt_child = NULL;
2167 static struct lu_device *mdt_layer_setup(const struct lu_context *ctx,
2168 const char *typename,
2169 struct lu_device *child,
2170 struct lustre_cfg *cfg)
2172 struct obd_type *type;
2173 struct lu_device_type *ldt;
2174 struct lu_device *d;
2178 type = class_get_type(typename);
2180 CERROR("Unknown type: '%s'\n", typename);
2181 GOTO(out, rc = -ENODEV);
2186 CERROR("type: '%s'\n", typename);
2187 GOTO(out_type, rc = -EINVAL);
2190 ldt->ldt_obd_type = type;
2191 d = ldt->ldt_ops->ldto_device_alloc(ctx, ldt, cfg);
2193 CERROR("Cannot allocate device: '%s'\n", typename);
2194 GOTO(out_type, rc = -ENODEV);
2197 LASSERT(child->ld_site);
2198 d->ld_site = child->ld_site;
2201 rc = ldt->ldt_ops->ldto_device_init(ctx, d, child);
2203 CERROR("can't init device '%s', rc %d\n", typename, rc);
2204 GOTO(out_alloc, rc);
2210 ldt->ldt_ops->ldto_device_free(ctx, d);
2213 class_put_type(type);
2218 static int mdt_stack_init(const struct lu_context *ctx,
2219 struct mdt_device *m, struct lustre_cfg *cfg)
2221 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2222 struct lu_device *tmp;
2223 struct md_device *md;
2227 /* init the stack */
2228 tmp = mdt_layer_setup(ctx, LUSTRE_OSD0_NAME, d, cfg);
2230 RETURN(PTR_ERR(tmp));
2232 m->mdt_bottom = lu2dt_dev(tmp);
2234 tmp = mdt_layer_setup(ctx, LUSTRE_MDD0_NAME, d, cfg);
2236 GOTO(out, rc = PTR_ERR(tmp));
2241 tmp = mdt_layer_setup(ctx, LUSTRE_CMM0_NAME, d, cfg);
2243 GOTO(out, rc = PTR_ERR(tmp));
2246 /*set mdd upcall device*/
2247 md->md_upcall.mu_upcall_dev = lu2md_dev(d);
2250 /*set cmm upcall device*/
2251 md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
2253 m->mdt_child = lu2md_dev(d);
2255 /* process setup config */
2256 tmp = &m->mdt_md_dev.md_lu_dev;
2257 rc = tmp->ld_ops->ldo_process_config(ctx, tmp, cfg);
2260 /* fini from last known good lu_device */
2262 mdt_stack_fini(ctx, m, d);
2267 static void mdt_fini(const struct lu_context *ctx, struct mdt_device *m)
2269 struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
2270 struct lu_site *ls = d->ld_site;
2274 mdt_fs_cleanup(ctx, m);
2275 ping_evictor_stop();
2276 mdt_stop_ptlrpc_service(m);
2278 mdt_seq_fini(ctx, m);
2279 mdt_seq_fini_cli(m);
2281 mdt_fld_fini(ctx, m);
2283 /* finish the stack */
2284 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2286 if (m->mdt_namespace != NULL) {
2287 ldlm_namespace_free(m->mdt_namespace, 0);
2288 m->mdt_namespace = NULL;
2295 LASSERT(atomic_read(&d->ld_ref) == 0);
2296 md_device_fini(&m->mdt_md_dev);
2301 static int mdt_init0(const struct lu_context *ctx, struct mdt_device *m,
2302 struct lu_device_type *t, struct lustre_cfg *cfg)
2304 struct mdt_thread_info *info;
2305 struct obd_device *obd;
2306 const char *dev = lustre_cfg_string(cfg, 0);
2307 const char *num = lustre_cfg_string(cfg, 2);
2312 info = lu_context_key_get(ctx, &mdt_thread_key);
2313 LASSERT(info != NULL);
2315 obd = class_name2obd(dev);
2318 spin_lock_init(&m->mdt_transno_lock);
2320 /* FIXME: We need to load them from disk. But now fake it */
2321 m->mdt_last_transno = 1;
2322 m->mdt_last_committed = 1;
2324 m->mdt_max_mdsize = MAX_MD_SIZE;
2325 m->mdt_max_cookiesize = sizeof(struct llog_cookie);
2327 spin_lock_init(&m->mdt_epoch_lock);
2328 /* Temporary. should parse mount option. */
2329 m->mdt_opts.mo_user_xattr = 0;
2330 m->mdt_opts.mo_acl = 0;
2331 m->mdt_opts.mo_compat_resname = 0;
2332 obd->obd_replayable = 1;
2339 md_device_init(&m->mdt_md_dev, t);
2340 m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
2341 m->mdt_md_dev.md_lu_dev.ld_obd = obd;
2343 rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
2345 CERROR("can't init lu_site, rc %d\n", rc);
2346 GOTO(err_free_site, rc);
2349 /* init the stack */
2350 rc = mdt_stack_init(ctx, m, cfg);
2352 CERROR("can't init device stack, rc %d\n", rc);
2353 GOTO(err_fini_site, rc);
2355 /* set server index */
2357 s->ls_node_id = simple_strtol(num, NULL, 10);
2359 rc = mdt_fld_init(ctx, obd->obd_name, m);
2361 GOTO(err_fini_stack, rc);
2363 rc = mdt_seq_init(ctx, obd->obd_name, m);
2365 GOTO(err_fini_fld, rc);
2367 snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
2368 LUSTRE_MDT0_NAME"-%p", m);
2369 m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
2370 LDLM_NAMESPACE_SERVER);
2371 if (m->mdt_namespace == NULL)
2372 GOTO(err_fini_seq, rc = -ENOMEM);
2374 ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
2376 rc = mdt_start_ptlrpc_service(m);
2378 GOTO(err_free_ns, rc);
2380 ping_evictor_start();
2381 rc = mdt_fs_setup(ctx, m);
2383 GOTO(err_stop_service, rc);
2387 mdt_stop_ptlrpc_service(m);
2389 ldlm_namespace_free(m->mdt_namespace, 0);
2390 m->mdt_namespace = NULL;
2392 mdt_seq_fini(ctx, m);
2394 mdt_fld_fini(ctx, m);
2396 mdt_stack_fini(ctx, m, md2lu_dev(m->mdt_child));
2402 md_device_fini(&m->mdt_md_dev);
2406 /* used by MGS to process specific configurations */
2407 static int mdt_process_config(const struct lu_context *ctx,
2408 struct lu_device *d, struct lustre_cfg *cfg)
2410 struct mdt_device *m = mdt_dev(d);
2411 struct md_device *md_next = m->mdt_child;
2412 struct lu_device *next = md2lu_dev(md_next);
2416 switch (cfg->lcfg_command) {
2419 * Add mdc hook to get first MDT uuid and connect it to
2420 * ls->controller to use for seq manager.
2422 err = mdt_seq_init_cli(ctx, mdt_dev(d), cfg);
2424 CERROR("can't initialize controller export, "
2428 /* others are passed further */
2429 err = next->ld_ops->ldo_process_config(ctx, next, cfg);
2435 static struct lu_object *mdt_object_alloc(const struct lu_context *ctxt,
2436 const struct lu_object_header *hdr,
2437 struct lu_device *d)
2439 struct mdt_object *mo;
2445 struct lu_object *o;
2446 struct lu_object_header *h;
2448 o = &mo->mot_obj.mo_lu;
2449 h = &mo->mot_header;
2450 lu_object_header_init(h);
2451 lu_object_init(o, h, d);
2452 lu_object_add_top(h, o);
2453 o->lo_ops = &mdt_obj_ops;
2459 static int mdt_object_init(const struct lu_context *ctxt, struct lu_object *o)
2461 struct mdt_device *d = mdt_dev(o->lo_dev);
2462 struct lu_device *under;
2463 struct lu_object *below;
2467 CDEBUG(D_INODE, "object init, fid = "DFID"\n",
2468 PFID(lu_object_fid(o)));
2470 under = &d->mdt_child->md_lu_dev;
2471 below = under->ld_ops->ldo_object_alloc(ctxt, o->lo_header, under);
2472 if (below != NULL) {
2473 lu_object_add(o, below);
2479 static void mdt_object_free(const struct lu_context *ctxt, struct lu_object *o)
2481 struct mdt_object *mo = mdt_obj(o);
2482 struct lu_object_header *h;
2486 CDEBUG(D_INODE, "object free, fid = "DFID"\n",
2487 PFID(lu_object_fid(o)));
2490 lu_object_header_fini(h);
2495 static int mdt_object_print(const struct lu_context *ctxt, void *cookie,
2496 lu_printer_t p, const struct lu_object *o)
2498 return (*p)(ctxt, cookie, LUSTRE_MDT0_NAME"-object@%p", o);
2501 static struct lu_device_operations mdt_lu_ops = {
2502 .ldo_object_alloc = mdt_object_alloc,
2503 .ldo_process_config = mdt_process_config
2506 static struct lu_object_operations mdt_obj_ops = {
2507 .loo_object_init = mdt_object_init,
2508 .loo_object_free = mdt_object_free,
2509 .loo_object_print = mdt_object_print
2512 /* mds_connect_internal */
2513 static int mdt_connect_internal(const struct lu_context *ctx,
2514 struct mdt_device *mdt,
2515 struct obd_export *exp,
2516 struct obd_uuid *cluuid,
2517 struct obd_connect_data *data)
2519 struct mdt_export_data *med = &exp->exp_mdt_data;
2520 struct mdt_client_data *mcd;
2524 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
2525 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
2527 /* If no known bits (which should not happen, probably,
2528 as everybody should support LOOKUP and UPDATE bits at least)
2529 revert to compat mode with plain locks. */
2530 if (!data->ocd_ibits_known &&
2531 data->ocd_connect_flags & OBD_CONNECT_IBITS)
2532 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
2534 if (!mdt->mdt_opts.mo_acl)
2535 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
2537 if (!mdt->mdt_opts.mo_user_xattr)
2538 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
2540 exp->exp_connect_flags = data->ocd_connect_flags;
2541 data->ocd_version = LUSTRE_VERSION_CODE;
2542 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
2545 if (mdt->mdt_opts.mo_acl &&
2546 ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
2547 CWARN("%s: MDS requires ACL support but client does not\n",
2548 mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
2554 memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
2556 rc = mdt_client_add(ctx, mdt, med, -1);
2565 /* mds_connect copy */
2566 static int mdt_obd_connect(const struct lu_context *ctx,
2567 struct lustre_handle *conn, struct obd_device *obd,
2568 struct obd_uuid *cluuid,
2569 struct obd_connect_data *data)
2571 struct obd_export *exp;
2572 struct mdt_device *mdt;
2576 LASSERT(ctx != NULL);
2577 if (!conn || !obd || !cluuid)
2580 mdt = mdt_dev(obd->obd_lu_dev);
2582 rc = class_connect(conn, obd, cluuid);
2586 exp = class_conn2export(conn);
2587 LASSERT(exp != NULL);
2589 rc = mdt_connect_internal(ctx, mdt, exp, cluuid, data);
2591 class_disconnect(exp);
2593 class_export_put(exp);
2598 static int mdt_obd_disconnect(struct obd_export *exp)
2604 class_export_get(exp);
2606 /* Disconnect early so that clients can't keep using export */
2607 rc = class_disconnect(exp);
2608 //ldlm_cancel_locks_for_export(exp);
2610 /* complete all outstanding replies */
2611 spin_lock(&exp->exp_lock);
2612 while (!list_empty(&exp->exp_outstanding_replies)) {
2613 struct ptlrpc_reply_state *rs =
2614 list_entry(exp->exp_outstanding_replies.next,
2615 struct ptlrpc_reply_state, rs_exp_list);
2616 struct ptlrpc_service *svc = rs->rs_service;
2618 spin_lock(&svc->srv_lock);
2619 list_del_init(&rs->rs_exp_list);
2620 ptlrpc_schedule_difficult_reply(rs);
2621 spin_unlock(&svc->srv_lock);
2623 spin_unlock(&exp->exp_lock);
2625 class_export_put(exp);
2629 /* FIXME: Can we avoid using these two interfaces? */
2630 static int mdt_init_export(struct obd_export *exp)
2632 struct mdt_export_data *med = &exp->exp_mdt_data;
2635 INIT_LIST_HEAD(&med->med_open_head);
2636 spin_lock_init(&med->med_open_lock);
2637 exp->exp_connecting = 1;
2641 static int mdt_destroy_export(struct obd_export *export)
2643 struct mdt_export_data *med;
2644 struct obd_device *obd = export->exp_obd;
2645 struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
2646 struct mdt_thread_info *info;
2647 struct lu_context ctxt;
2651 med = &export->exp_mdt_data;
2652 target_destroy_export(export);
2654 if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
2657 rc = lu_context_init(&ctxt, LCT_MD_THREAD);
2661 lu_context_enter(&ctxt);
2663 info = lu_context_key_get(&ctxt, &mdt_thread_key);
2664 LASSERT(info != NULL);
2665 memset(info, 0, sizeof *info);
2666 /* Close any open files (which may also cause orphan unlinking). */
2667 spin_lock(&med->med_open_lock);
2668 while (!list_empty(&med->med_open_head)) {
2669 struct list_head *tmp = med->med_open_head.next;
2670 struct mdt_file_data *mfd =
2671 list_entry(tmp, struct mdt_file_data, mfd_list);
2672 struct mdt_object *o = mfd->mfd_object;
2674 /* Remove mfd handle so it can't be found again.
2675 * We are consuming the mfd_list reference here. */
2676 class_handle_unhash(&mfd->mfd_handle);
2677 list_del_init(&mfd->mfd_list);
2678 spin_unlock(&med->med_open_lock);
2679 mdt_mfd_close(&ctxt, mdt, mfd, &info->mti_attr);
2680 /* TODO: if we close the unlinked file,
2681 * we need to remove it's objects from OST */
2682 mdt_object_put(&ctxt, o);
2683 spin_lock(&med->med_open_lock);
2685 spin_unlock(&med->med_open_lock);
2686 mdt_client_free(&ctxt, mdt, med);
2688 lu_context_exit(&ctxt);
2689 lu_context_fini(&ctxt);
2694 static int mdt_upcall(const struct lu_context *ctx, struct md_device *md,
2695 enum md_upcall_event ev)
2697 struct mdt_device *m = mdt_dev(&md->md_lu_dev);
2698 struct md_device *next = m->mdt_child;
2704 rc = next->md_ops->mdo_get_maxsize(ctx, next,
2706 &m->mdt_max_cookiesize);
2707 CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
2708 m->mdt_max_mdsize, m->mdt_max_cookiesize);
2711 CERROR("invalid event\n");
2719 static struct obd_ops mdt_obd_device_ops = {
2720 .o_owner = THIS_MODULE,
2721 .o_connect = mdt_obd_connect,
2722 .o_disconnect = mdt_obd_disconnect,
2723 .o_init_export = mdt_init_export,
2724 .o_destroy_export = mdt_destroy_export,
2727 static struct lu_device* mdt_device_fini(const struct lu_context *ctx,
2728 struct lu_device *d)
2730 struct mdt_device *m = mdt_dev(d);
2736 static void mdt_device_free(const struct lu_context *ctx, struct lu_device *d)
2738 struct mdt_device *m = mdt_dev(d);
2743 static struct lu_device *mdt_device_alloc(const struct lu_context *ctx,
2744 struct lu_device_type *t,
2745 struct lustre_cfg *cfg)
2747 struct lu_device *l;
2748 struct mdt_device *m;
2754 l = &m->mdt_md_dev.md_lu_dev;
2755 result = mdt_init0(ctx, m, t, cfg);
2758 l = ERR_PTR(result);
2760 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
2762 l = ERR_PTR(-ENOMEM);
2767 * context key constructor/destructor
2769 static void *mdt_thread_init(const struct lu_context *ctx,
2770 struct lu_context_key *key)
2772 struct mdt_thread_info *info;
2775 * check that no high order allocations are incurred.
2777 CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
2778 OBD_ALLOC_PTR(info);
2780 info = ERR_PTR(-ENOMEM);
2784 static void mdt_thread_fini(const struct lu_context *ctx,
2785 struct lu_context_key *key, void *data)
2787 struct mdt_thread_info *info = data;
2791 struct lu_context_key mdt_thread_key = {
2792 .lct_tags = LCT_MD_THREAD,
2793 .lct_init = mdt_thread_init,
2794 .lct_fini = mdt_thread_fini
2797 static void *mdt_txn_init(const struct lu_context *ctx,
2798 struct lu_context_key *key)
2800 struct mdt_txn_info *txi;
2803 * check that no high order allocations are incurred.
2805 CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
2808 txi = ERR_PTR(-ENOMEM);
2812 static void mdt_txn_fini(const struct lu_context *ctx,
2813 struct lu_context_key *key, void *data)
2815 struct mdt_txn_info *txi = data;
2819 struct lu_context_key mdt_txn_key = {
2820 .lct_tags = LCT_TX_HANDLE,
2821 .lct_init = mdt_txn_init,
2822 .lct_fini = mdt_txn_fini
2826 static int mdt_type_init(struct lu_device_type *t)
2830 rc = lu_context_key_register(&mdt_thread_key);
2832 rc = lu_context_key_register(&mdt_txn_key);
2836 static void mdt_type_fini(struct lu_device_type *t)
2838 lu_context_key_degister(&mdt_thread_key);
2839 lu_context_key_degister(&mdt_txn_key);
2842 static struct lu_device_type_operations mdt_device_type_ops = {
2843 .ldto_init = mdt_type_init,
2844 .ldto_fini = mdt_type_fini,
2846 .ldto_device_alloc = mdt_device_alloc,
2847 .ldto_device_free = mdt_device_free,
2848 .ldto_device_fini = mdt_device_fini
2851 static struct lu_device_type mdt_device_type = {
2852 .ldt_tags = LU_DEVICE_MD,
2853 .ldt_name = LUSTRE_MDT0_NAME,
2854 .ldt_ops = &mdt_device_type_ops,
2855 .ldt_ctx_tags = LCT_MD_THREAD
2858 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
2862 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
2866 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
2868 static int __init mdt_mod_init(void)
2871 struct lprocfs_static_vars lvars;
2873 mdt_num_threads = MDT_NUM_THREADS;
2874 lprocfs_init_vars(mdt, &lvars);
2875 result = class_register_type(&mdt_obd_device_ops, NULL,
2876 lvars.module_vars, LUSTRE_MDT0_NAME,
2881 static void __exit mdt_mod_exit(void)
2883 class_unregister_type(LUSTRE_MDT0_NAME);
2887 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt) \
2888 [prefix ## _ ## opc - prefix ## _ ## base] = { \
2890 .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## suffix, \
2891 .mh_opc = prefix ## _ ## opc, \
2892 .mh_flags = flags, \
2897 #define DEF_MDT_HNDL(flags, name, fn, fmt) \
2898 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
2900 * Request with a format known in advance
2902 #define DEF_MDT_HNDL_F(flags, name, fn) \
2903 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
2905 * Request with a format we do not yet know
2907 #define DEF_MDT_HNDL_0(flags, name, fn) \
2908 DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
2910 static struct mdt_handler mdt_mds_ops[] = {
2911 DEF_MDT_HNDL_F(0, CONNECT, mdt_connect),
2912 DEF_MDT_HNDL_F(0, DISCONNECT, mdt_disconnect),
2913 DEF_MDT_HNDL_F(0 |HABEO_REFERO, GETSTATUS, mdt_getstatus),
2914 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR, mdt_getattr),
2915 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
2916 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO|MUTABOR,
2917 SETXATTR, mdt_setxattr),
2918 DEF_MDT_HNDL_F(HABEO_CORPUS, GETXATTR, mdt_getxattr),
2919 DEF_MDT_HNDL_F(0 |HABEO_REFERO, STATFS, mdt_statfs),
2920 DEF_MDT_HNDL_F(0 |MUTABOR,
2922 DEF_MDT_HNDL_F(HABEO_CORPUS , CLOSE, mdt_close),
2923 DEF_MDT_HNDL_0(0, DONE_WRITING, mdt_done_writing),
2924 DEF_MDT_HNDL_F(0 |HABEO_REFERO, PIN, mdt_pin),
2925 DEF_MDT_HNDL_0(0, SYNC, mdt_sync),
2926 DEF_MDT_HNDL_0(0, QUOTACHECK, mdt_quotacheck_handle),
2927 DEF_MDT_HNDL_0(0, QUOTACTL, mdt_quotactl_handle)
2930 #define DEF_OBD_HNDL(flags, name, fn) \
2931 DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
2934 static struct mdt_handler mdt_obd_ops[] = {
2935 DEF_OBD_HNDL(0, PING, mdt_obd_ping),
2936 DEF_OBD_HNDL(0, LOG_CANCEL, mdt_obd_log_cancel),
2937 DEF_OBD_HNDL(0, QC_CALLBACK, mdt_obd_qc_callback)
2940 #define DEF_DLM_HNDL_0(flags, name, fn) \
2941 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
2942 #define DEF_DLM_HNDL_F(flags, name, fn) \
2943 DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
2945 static struct mdt_handler mdt_dlm_ops[] = {
2946 DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE, mdt_enqueue),
2947 DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT, mdt_convert),
2948 DEF_DLM_HNDL_0(0, BL_CALLBACK, mdt_bl_callback),
2949 DEF_DLM_HNDL_0(0, CP_CALLBACK, mdt_cp_callback)
2952 static struct mdt_handler mdt_llog_ops[] = {
2955 static struct mdt_opc_slice mdt_regular_handlers[] = {
2957 .mos_opc_start = MDS_GETATTR,
2958 .mos_opc_end = MDS_LAST_OPC,
2959 .mos_hs = mdt_mds_ops
2962 .mos_opc_start = OBD_PING,
2963 .mos_opc_end = OBD_LAST_OPC,
2964 .mos_hs = mdt_obd_ops
2967 .mos_opc_start = LDLM_ENQUEUE,
2968 .mos_opc_end = LDLM_LAST_OPC,
2969 .mos_hs = mdt_dlm_ops
2972 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
2973 .mos_opc_end = LLOG_LAST_OPC,
2974 .mos_hs = mdt_llog_ops
2981 static struct mdt_handler mdt_readpage_ops[] = {
2982 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
2985 * XXX: this is ugly and should be fixed one day, see mdc_close() for
2986 * detailed comment. --umka
2988 DEF_MDT_HNDL_F(HABEO_CORPUS, CLOSE, mdt_close),
2991 static struct mdt_opc_slice mdt_readpage_handlers[] = {
2993 .mos_opc_start = MDS_GETATTR,
2994 .mos_opc_end = MDS_LAST_OPC,
2995 .mos_hs = mdt_readpage_ops
3002 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
3003 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT0_NAME")");
3004 MODULE_LICENSE("GPL");
3006 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
3007 "number of mdt service threads to start");
3009 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);