From 1837cd1905e500eed0bcf46d79436518982dc8cf Mon Sep 17 00:00:00 2001 From: nikita Date: Sun, 2 Apr 2006 15:21:44 +0000 Subject: [PATCH] mdt prototype updates: cleanup of mdt_handle() --- lustre/mds/handler.c | 10 +- lustre/mdt/mdt.h | 68 ++--- lustre/mdt/mdt_handler.c | 696 ++++++++++++++++++++++++++--------------------- 3 files changed, 432 insertions(+), 342 deletions(-) diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 8312d22..823e36e 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1370,7 +1370,7 @@ static int mds_msg_check_version(struct lustre_msg *msg) int mds_handle(struct ptlrpc_request *req) { int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET; - int rc = 0; + int rc; struct mds_obd *mds = NULL; /* quell gcc overwarning */ struct obd_device *obd = NULL; ENTRY; @@ -1400,7 +1400,7 @@ int mds_handle(struct ptlrpc_request *req) med = &req->rq_export->exp_mds_data; obd = req->rq_export->exp_obd; - mds = &obd->u.mds; + mds = mds_req2mds(req); /* sanity check: if the xid matches, the request must * be marked as a resent or replayed */ @@ -1439,6 +1439,12 @@ int mds_handle(struct ptlrpc_request *req) rc = target_handle_connect(req, mds_handle); if (!rc) { /* Now that we have an export, set mds. */ + /* + * XXX nikita: these assignments are useless: mds is + * never used below, and obd is only used for + * MSG_LAST_REPLAY case, which never happens for + * MDS_CONNECT. + */ obd = req->rq_export->exp_obd; mds = mds_req2mds(req); } diff --git a/lustre/mdt/mdt.h b/lustre/mdt/mdt.h index 45cad82..ae870af 100644 --- a/lustre/mdt/mdt.h +++ b/lustre/mdt/mdt.h @@ -37,8 +37,8 @@ struct ptlrpc_service_conf { struct md_object; struct md_device { - struct lu_device md_lu_dev; - struct md_device_operations *md_ops; + struct lu_device md_lu_dev; + struct md_device_operations *md_ops; }; struct md_device_operations { @@ -61,7 +61,7 @@ struct mdt_device { }; struct md_object { - struct lu_object mo_lu; + struct lu_object mo_lu; }; static inline int lu_device_is_md(struct lu_device *d) @@ -72,22 +72,22 @@ static inline int lu_device_is_md(struct lu_device *d) static inline struct md_object *lu2md(struct lu_object *o) { LASSERT(lu_device_is_md(o->lo_dev)); - return container_of(o, struct md_object, mo_lu); + return container_of(o, struct md_object, mo_lu); } static inline struct md_device *md_device_get(struct md_object *o) { LASSERT(lu_device_is_md(o->mo_lu.lo_dev)); - return container_of(o->mo_lu.lo_dev, struct md_device, md_lu_dev); + return container_of(o->mo_lu.lo_dev, struct md_device, md_lu_dev); } struct mdt_object { - struct lu_object_header mot_header; - struct md_object mot_obj; + struct lu_object_header mot_header; + struct md_object mot_obj; }; struct mdt_lock_handle { - struct lustre_handle mlh_lh; + struct lustre_handle mlh_lh; ldlm_mode_t mlh_mode; }; @@ -95,19 +95,19 @@ void mdt_lock_handle_init(struct mdt_lock_handle *lh); void mdt_lock_handle_fini(struct mdt_lock_handle *lh); struct mdd_object { - struct md_object mod_obj; + struct md_object mod_obj; }; struct osd_object { - struct lu_object oo_lu; - struct dentry *oo_dentry; + struct lu_object oo_lu; + struct dentry *oo_dentry; }; int md_device_init(struct md_device *md, struct lu_device_type *t); void md_device_fini(struct md_device *md); enum { - MDT_REP_BUF_NR_MAX = 8 + MDT_REP_BUF_NR_MAX = 8 }; enum { @@ -121,28 +121,28 @@ enum { * reduce stack consumption. */ struct mdt_thread_info { - struct mdt_device *mti_mdt; - /* - * number of buffers in reply message. - */ - int mti_rep_buf_nr; - /* - * sizes of reply buffers. - */ - int mti_rep_buf_size[MDT_REP_BUF_NR_MAX]; - /* - * Body for "habeo corpus" operations. - */ - struct mds_body *mti_body; - /* - * Host object. This is released at the end of mdt_handler(). - */ - struct mdt_object *mti_object; - /* - * Additional fail id that can be set by handler. Passed to - * target_send_reply(). - */ - int mti_fail_id; + struct mdt_device *mti_mdt; + /* + * number of buffers in reply message. + */ + int mti_rep_buf_nr; + /* + * sizes of reply buffers. + */ + int mti_rep_buf_size[MDT_REP_BUF_NR_MAX]; + /* + * Body for "habeo corpus" operations. + */ + struct mds_body *mti_body; + /* + * Host object. This is released at the end of mdt_handler(). + */ + struct mdt_object *mti_object; + /* + * Additional fail id that can be set by handler. Passed to + * target_send_reply(). + */ + int mti_fail_id; /* * A couple of lock handles. */ diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index cd9fb32..7c62010 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -62,26 +62,28 @@ */ unsigned long mdt_num_threads; +static int mdt_handle(struct ptlrpc_request *req); + static int mdt_getstatus(struct mdt_thread_info *info, - struct ptlrpc_request *req, int offset) + struct ptlrpc_request *req, int offset) { struct md_device *mdd = info->mti_mdt->mdt_child; - struct mds_body *body; - int size = sizeof *body; - int result; + struct mds_body *body; + int size = sizeof *body; + int result; ENTRY; result = lustre_pack_reply(req, 1, &size, NULL); - if (result) + if (result) CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n", - size); + size); else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) result = -ENOMEM; else { - body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body); - result = mdd->md_ops->mdo_root_get(mdd, &body->fid1); - } + body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body); + result = mdd->md_ops->mdo_root_get(mdd, &body->fid1); + } /* the last_committed and last_xid fields are filled in for all * replies already - no need to do so here also. @@ -92,7 +94,7 @@ static int mdt_getstatus(struct mdt_thread_info *info, static int mdt_connect(struct mdt_thread_info *info, struct ptlrpc_request *req, int offset) { - return -EOPNOTSUPP; + return target_handle_connect(req, mdt_handle); } static int mdt_disconnect(struct mdt_thread_info *info, @@ -200,32 +202,32 @@ static struct lu_device_operations mdt_lu_ops; static int lu_device_is_mdt(struct lu_device *d) { - /* - * XXX for now. Tags in lu_device_type->ldt_something are needed. - */ - return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops); + /* + * XXX for now. Tags in lu_device_type->ldt_something are needed. + */ + return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops); } static struct mdt_object *mdt_obj(struct lu_object *o) { - LASSERT(lu_device_is_mdt(o->lo_dev)); - return container_of(o, struct mdt_object, mot_obj.mo_lu); + LASSERT(lu_device_is_mdt(o->lo_dev)); + return container_of(o, struct mdt_object, mot_obj.mo_lu); } struct mdt_object *mdt_object_find(struct mdt_device *d, struct ll_fid *f) { - struct lu_object *o; + struct lu_object *o; - o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f); - if (IS_ERR(o)) - return (struct mdt_object *)o; - else - return mdt_obj(o); + o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f); + if (IS_ERR(o)) + return (struct mdt_object *)o; + else + return mdt_obj(o); } void mdt_object_put(struct mdt_object *o) { - lu_object_put(&o->mot_obj.mo_lu); + lu_object_put(&o->mot_obj.mo_lu); } static struct ll_fid *mdt_object_fid(struct mdt_object *o) @@ -268,19 +270,19 @@ struct mdt_object *mdt_object_find_lock(struct mdt_device *d, struct ll_fid *f, } struct mdt_handler { - const char *mh_name; - int mh_fail_id; - __u32 mh_opc; - __u32 mh_flags; - int (*mh_act)(struct mdt_thread_info *info, + const char *mh_name; + int mh_fail_id; + __u32 mh_opc; + __u32 mh_flags; + int (*mh_act)(struct mdt_thread_info *info, struct ptlrpc_request *req, int offset); }; enum mdt_handler_flags { - /* - * struct mds_body is passed in the 0-th incoming buffer. - */ - HABEO_CORPUS = (1 << 0) + /* + * struct mds_body is passed in the 0-th incoming buffer. + */ + HABEO_CORPUS = (1 << 0) }; struct mdt_opc_slice { @@ -293,66 +295,88 @@ static struct mdt_opc_slice mdt_handlers[]; struct mdt_handler *mdt_handler_find(__u32 opc) { - struct mdt_opc_slice *s; - struct mdt_handler *h; + struct mdt_opc_slice *s; + struct mdt_handler *h; + + h = NULL; + for (s = mdt_handlers; s->mos_hs != NULL; s++) { + if (s->mos_opc_start <= opc && opc < s->mos_opc_end) { + h = s->mos_hs + (opc - s->mos_opc_start); + if (h->mh_opc != 0) + LASSERT(h->mh_opc == opc); + else + h = NULL; /* unsupported opc */ + break; + } + } + return h; +} - h = NULL; - for (s = mdt_handlers; s->mos_hs != NULL; s++) { - if (s->mos_opc_start <= opc && opc < s->mos_opc_end) { - h = s->mos_hs + (opc - s->mos_opc_start); - if (h->mh_opc != 0) - LASSERT(h->mh_opc == opc); - else - h = NULL; /* unsupported opc */ - break; - } - } - return h; +static inline __u64 req_exp_last_xid(struct ptlrpc_request *req) +{ + return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid; } +/* + * Invoke handler for this request opc. Also do necessary preprocessing + * (according to handler ->mh_flags), and post-processing (setting of + * ->last_{xid,committed}). + */ static int mdt_req_handle(struct mdt_thread_info *info, - struct mdt_handler *h, struct ptlrpc_request *req, - int shift) + struct mdt_handler *h, struct ptlrpc_request *req, + int shift) { - int result; + int result; int off; - ENTRY; + ENTRY; - LASSERT(h->mh_act != NULL); - LASSERT(h->mh_opc == req->rq_reqmsg->opc); + LASSERT(h->mh_act != NULL); + LASSERT(h->mh_opc == req->rq_reqmsg->opc); + LASSERT(current->journal_info == NULL); - DEBUG_REQ(D_INODE, req, "%s", h->mh_name); + DEBUG_REQ(D_INODE, req, "%s", h->mh_name); - if (h->mh_fail_id != 0) - OBD_FAIL_RETURN(h->mh_fail_id, 0); + if (h->mh_fail_id != 0) + OBD_FAIL_RETURN(h->mh_fail_id, 0); - off = MDS_REQ_REC_OFF + shift; + off = MDS_REQ_REC_OFF + shift; result = 0; - if (h->mh_flags & HABEO_CORPUS) { - info->mti_body = lustre_swab_reqbuf(req, off, + if (h->mh_flags & HABEO_CORPUS) { + info->mti_body = lustre_swab_reqbuf(req, off, sizeof *info->mti_body, - lustre_swab_mds_body); - if (info->mti_body == NULL) { - CERROR("Can't unpack body\n"); - result = req->rq_status = -EFAULT; - } - info->mti_object = mdt_object_find(info->mti_mdt, - &info->mti_body->fid1); - if (IS_ERR(info->mti_object)) - result = PTR_ERR(info->mti_object); - } - if (result == 0) - result = h->mh_act(info, req, off); - /* - * XXX result value is unconditionally shoved into ->rq_status - * (original code sometimes placed error code into ->rq_status, and - * sometimes returned it to the - * caller). ptlrpc_server_handle_request() doesn't check return value - * anyway. - */ - req->rq_status = result; - RETURN(result); + lustre_swab_mds_body); + if (info->mti_body == NULL) { + CERROR("Can't unpack body\n"); + result = req->rq_status = -EFAULT; + } + info->mti_object = mdt_object_find(info->mti_mdt, + &info->mti_body->fid1); + if (IS_ERR(info->mti_object)) + result = PTR_ERR(info->mti_object); + } + if (result == 0) + /* + * Process request. + */ + result = h->mh_act(info, req, off); + /* + * XXX result value is unconditionally shoved into ->rq_status + * (original code sometimes placed error code into ->rq_status, and + * sometimes returned it to the + * caller). ptlrpc_server_handle_request() doesn't check return value + * anyway. + */ + req->rq_status = result; + + LASSERT(current->journal_info == NULL); + + /* If we're DISCONNECTing, the mds_export_data is already freed */ + if (result == 0 && h->mh_opc != MDS_DISCONNECT) { + req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req)); + target_committed_to_req(req); + } + RETURN(result); } void mdt_lock_handle_init(struct mdt_lock_handle *lh) @@ -370,13 +394,13 @@ static void mdt_thread_info_init(struct mdt_thread_info *info) { int i; - memset(info, 0, sizeof *info); - info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET; - /* - * Poison size array. - */ - for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++) - info->mti_rep_buf_size[i] = ~0; + memset(info, 0, sizeof *info); + info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET; + /* + * Poison size array. + */ + for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++) + info->mti_rep_buf_size[i] = ~0; info->mti_rep_buf_nr = i; for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++) mdt_lock_handle_init(&info->mti_lh[i]); @@ -386,10 +410,10 @@ static void mdt_thread_info_fini(struct mdt_thread_info *info) { int i; - if (info->mti_object != NULL) { - mdt_object_put(info->mti_object); - info->mti_object = NULL; - } + if (info->mti_object != NULL) { + mdt_object_put(info->mti_object); + info->mti_object = NULL; + } for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++) mdt_lock_handle_fini(&info->mti_lh[i]); } @@ -462,110 +486,170 @@ static int mds_msg_check_version(struct lustre_msg *msg) return rc; } -static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) +static int mdt_filter_recovery_request(struct ptlrpc_request *req, + struct obd_device *obd, int *process) { - int rc; - struct mds_obd *mds = NULL; /* quell gcc overwarning */ - struct obd_device *obd = NULL; - struct mdt_handler *h; + switch (req->rq_reqmsg->opc) { + case MDS_CONNECT: /* This will never get here, but for completeness. */ + case OST_CONNECT: /* This will never get here, but for completeness. */ + case MDS_DISCONNECT: + case OST_DISCONNECT: + *process = 1; + RETURN(0); + + case MDS_CLOSE: + case MDS_SYNC: /* used in unmounting */ + case OBD_PING: + case MDS_REINT: + case LDLM_ENQUEUE: + *process = target_queue_recovery_request(req, obd); + RETURN(0); + + default: + DEBUG_REQ(D_ERROR, req, "not permitted during recovery"); + *process = 0; + /* XXX what should we set rq_status to here? */ + req->rq_status = -EAGAIN; + RETURN(ptlrpc_error(req)); + } +} + +/* + * Handle recovery. Return: + * +ve: continue request processing; + * -ve: abort immediately with given (negated) error code; + * 0: send reply with error code in req->rq_status; + */ +static int mdt_recovery(struct ptlrpc_request *req) +{ + int recovering; + int abort_recovery; + struct obd_device *obd; ENTRY; - OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0); + if (req->rq_reqmsg->opc == MDS_CONNECT) + RETURN(+1); - LASSERT(current->journal_info == NULL); + if (req->rq_export == NULL) { + CERROR("operation %d on unconnected MDS from %s\n", + req->rq_reqmsg->opc, + libcfs_id2str(req->rq_peer)); + req->rq_status = -ENOTCONN; + RETURN(0); + } - rc = mds_msg_check_version(req->rq_reqmsg); - if (rc) { - CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n"); - RETURN(rc); + /* sanity check: if the xid matches, the request must be marked as a + * resent or replayed */ + LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req), + lustre_msg_get_flags(req->rq_reqmsg) & + (MSG_RESENT | MSG_REPLAY)), + "rq_xid "LPU64" matches last_xid, " + "expected RESENT flag\n", req->rq_xid); + + /* else: note the opposite is not always true; a RESENT req after a + * failover will usually not match the last_xid, since it was likely + * never committed. A REPLAYed request will almost never match the + * last xid, however it could for a committed, but still retained, + * open. */ + + obd = req->rq_export->exp_obd; + + /* Check for aborted recovery... */ + spin_lock_bh(&obd->obd_processing_task_lock); + abort_recovery = obd->obd_abort_recovery; + recovering = obd->obd_recovering; + spin_unlock_bh(&obd->obd_processing_task_lock); + if (abort_recovery) { + target_abort_recovery(obd); + } else if (recovering) { + int rc; + int should_process; + + rc = mdt_filter_recovery_request(req, obd, &should_process); + if (rc != 0 || !should_process) { + LASSERT(rc < 0); + RETURN(rc); + } } + RETURN(+1); +} - if (req->rq_reqmsg->opc != MDS_CONNECT) { - struct mds_export_data *med; +static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info) +{ + struct obd_device *obd; + + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { + if (req->rq_reqmsg->opc != OBD_PING) + DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY"); - if (req->rq_export == NULL) { - CERROR("operation %d on unconnected MDS from %s\n", - req->rq_reqmsg->opc, - libcfs_id2str(req->rq_peer)); + obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL; + if (obd && obd->obd_recovering) { + DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); + RETURN(target_queue_final_reply(req, req->rq_status)); + } else { + /* Lost a race with recovery; let the error path + * DTRT. */ req->rq_status = -ENOTCONN; - GOTO(out, rc = -ENOTCONN); } - - med = &req->rq_export->exp_mds_data; - obd = req->rq_export->exp_obd; - mds = &obd->u.mds; - - /* sanity check: if the xid matches, the request must - * be marked as a resent or replayed */ - LASSERTF(ergo(req->rq_xid == med->med_mcd->mcd_last_xid, - lustre_msg_get_flags(req->rq_reqmsg) & - (MSG_RESENT | MSG_REPLAY)), - "rq_xid "LPU64" matches last_xid, " - "expected RESENT flag\n", req->rq_xid); - /* else: note the opposite is not always true; a - * RESENT req after a failover will usually not match - * the last_xid, since it was likely never - * committed. A REPLAYed request will almost never - * match the last xid, however it could for a - * committed, but still retained, open. */ - - /* Check for aborted recovery... */ } + target_send_reply(req, req->rq_status, info->mti_fail_id); + RETURN(req->rq_status); +} + +static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info) +{ + struct lustre_msg *msg; + int result; - h = mdt_handler_find(req->rq_reqmsg->opc); - if (h != NULL) { - rc = mdt_req_handle(info, h, req, 0); - } else { - req->rq_status = -ENOTSUPP; - rc = ptlrpc_error(req); - RETURN(rc); - } + ENTRY; - LASSERT(current->journal_info == NULL); + OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0); - /* If we're DISCONNECTing, the mds_export_data is already freed */ - if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) { - struct mds_export_data *med = &req->rq_export->exp_mds_data; - req->rq_repmsg->last_xid = - le64_to_cpu(med->med_mcd->mcd_last_xid); + LASSERT(current->journal_info == NULL); - target_committed_to_req(req); + msg = req->rq_reqmsg; + result = mds_msg_check_version(msg); + if (result != 0) { + CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n"); + RETURN(result); } - EXIT; - out: - - if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) { - if (obd && obd->obd_recovering) { - DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply"); - RETURN(target_queue_final_reply(req, rc)); + result = mdt_recovery(req); + if (result > 0) { + struct mdt_handler *h; + + h = mdt_handler_find(msg->opc); + if (h != NULL) + result = mdt_req_handle(info, h, req, 0); + else { + req->rq_status = -ENOTSUPP; + result = ptlrpc_error(req); + RETURN(result); } - /* Lost a race with recovery; let the error path DTRT. */ - rc = req->rq_status = -ENOTCONN; - } + } else if (result < 0) + RETURN(result); - target_send_reply(req, rc, info->mti_fail_id); - RETURN(0); + RETURN(mdt_reply(req, info)); } static struct mdt_device *mdt_dev(struct lu_device *d) { - LASSERT(lu_device_is_mdt(d)); - return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev); + LASSERT(lu_device_is_mdt(d)); + return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev); } -int mdt_handle(struct ptlrpc_request *req) +static int mdt_handle(struct ptlrpc_request *req) { - int result; + int result; - struct mdt_thread_info info; /* XXX on stack for now */ - mdt_thread_info_init(&info); - info.mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); + struct mdt_thread_info info; /* XXX on stack for now */ + mdt_thread_info_init(&info); + info.mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev); - result = mdt_handle0(req, &info); + result = mdt_handle0(req, &info); - mdt_thread_info_fini(&info); + mdt_thread_info_fini(&info); return result; } @@ -573,89 +657,89 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, ldlm_mode_t mode, int flags, void *data) { - RETURN(ELDLM_LOCK_ABORTED); + return ELDLM_LOCK_ABORTED; } struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, - svc_handler_t h, char *name, - struct proc_dir_entry *proc_entry, - svcreq_printfn_t prntfn) + svc_handler_t h, char *name, + struct proc_dir_entry *proc_entry, + svcreq_printfn_t prntfn) { - return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize, - c->psc_max_req_size, c->psc_max_reply_size, - c->psc_req_portal, c->psc_rep_portal, - c->psc_watchdog_timeout, - h, name, proc_entry, - prntfn, c->psc_num_threads); + return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize, + c->psc_max_req_size, c->psc_max_reply_size, + c->psc_req_portal, c->psc_rep_portal, + c->psc_watchdog_timeout, + h, name, proc_entry, + prntfn, c->psc_num_threads); } int md_device_init(struct md_device *md, struct lu_device_type *t) { - return lu_device_init(&md->md_lu_dev, t); + return lu_device_init(&md->md_lu_dev, t); } void md_device_fini(struct md_device *md) { - lu_device_fini(&md->md_lu_dev); + lu_device_fini(&md->md_lu_dev); } static void mdt_fini(struct lu_device *d) { - struct mdt_device *m = mdt_dev(d); - - if (d->ld_site != NULL) { - lu_site_fini(d->ld_site); - d->ld_site = NULL; - } - if (m->mdt_service != NULL) { - ptlrpc_unregister_service(m->mdt_service); - m->mdt_service = NULL; - } - if (m->mdt_namespace != NULL) { - ldlm_namespace_free(m->mdt_namespace, 0); - m->mdt_namespace = NULL; - } - - LASSERT(atomic_read(&d->ld_ref) == 0); - md_device_fini(&m->mdt_md_dev); + struct mdt_device *m = mdt_dev(d); + + if (d->ld_site != NULL) { + lu_site_fini(d->ld_site); + d->ld_site = NULL; + } + if (m->mdt_service != NULL) { + ptlrpc_unregister_service(m->mdt_service); + m->mdt_service = NULL; + } + if (m->mdt_namespace != NULL) { + ldlm_namespace_free(m->mdt_namespace, 0); + m->mdt_namespace = NULL; + } + + LASSERT(atomic_read(&d->ld_ref) == 0); + md_device_fini(&m->mdt_md_dev); } static int mdt_init0(struct mdt_device *m, struct lu_device_type *t, struct lustre_cfg *cfg) { - struct lu_site *s; + struct lu_site *s; char ns_name[48]; ENTRY; - OBD_ALLOC_PTR(s); - if (s == NULL) - return -ENOMEM; - - md_device_init(&m->mdt_md_dev, t); - - m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops; - - m->mdt_service_conf.psc_nbufs = MDS_NBUFS; - m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE; - m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE; - m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE; - m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL; - m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL; - m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT; - /* - * We'd like to have a mechanism to set this on a per-device basis, - * but alas... - */ - m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads, + OBD_ALLOC_PTR(s); + if (s == NULL) + return -ENOMEM; + + md_device_init(&m->mdt_md_dev, t); + + m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops; + + m->mdt_service_conf.psc_nbufs = MDS_NBUFS; + m->mdt_service_conf.psc_bufsize = MDS_BUFSIZE; + m->mdt_service_conf.psc_max_req_size = MDS_MAXREQSIZE; + m->mdt_service_conf.psc_max_reply_size = MDS_MAXREPSIZE; + m->mdt_service_conf.psc_req_portal = MDS_REQUEST_PORTAL; + m->mdt_service_conf.psc_rep_portal = MDC_REPLY_PORTAL; + m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT; + /* + * We'd like to have a mechanism to set this on a per-device basis, + * but alas... + */ + m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS), - MDT_MAX_THREADS); - lu_site_init(s, &m->mdt_md_dev.md_lu_dev); + MDT_MAX_THREADS); + lu_site_init(s, &m->mdt_md_dev.md_lu_dev); snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m); m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER); if (m->mdt_namespace == NULL) - return -ENOMEM; + return -ENOMEM; ldlm_register_intent(m->mdt_namespace, mdt_intent_policy); ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, @@ -666,54 +750,54 @@ static int mdt_init0(struct mdt_device *m, LUSTRE_MDT0_NAME, m->mdt_md_dev.md_lu_dev.ld_proc_entry, NULL); - if (m->mdt_service == NULL) - return -ENOMEM; + if (m->mdt_service == NULL) + return -ENOMEM; - return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME); + return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME); } struct lu_object *mdt_object_alloc(struct lu_device *d) { - struct mdt_object *mo; + struct mdt_object *mo; - OBD_ALLOC_PTR(mo); - if (mo != NULL) { - struct lu_object *o; - struct lu_object_header *h; + OBD_ALLOC_PTR(mo); + if (mo != NULL) { + struct lu_object *o; + struct lu_object_header *h; - o = &mo->mot_obj.mo_lu; - h = &mo->mot_header; - lu_object_header_init(h); - lu_object_init(o, h, d); - /* ->lo_depth and ->lo_flags are automatically 0 */ - lu_object_add_top(h, o); - return o; - } else - return NULL; + o = &mo->mot_obj.mo_lu; + h = &mo->mot_header; + lu_object_header_init(h); + lu_object_init(o, h, d); + /* ->lo_depth and ->lo_flags are automatically 0 */ + lu_object_add_top(h, o); + return o; + } else + return NULL; } int mdt_object_init(struct lu_object *o) { - struct mdt_device *d = mdt_dev(o->lo_dev); - struct lu_device *under; - struct lu_object *below; + struct mdt_device *d = mdt_dev(o->lo_dev); + struct lu_device *under; + struct lu_object *below; - under = &d->mdt_child->md_lu_dev; - below = under->ld_ops->ldo_object_alloc(under); - if (below != NULL) { - lu_object_add(o, below); - return 0; - } else - return -ENOMEM; + under = &d->mdt_child->md_lu_dev; + below = under->ld_ops->ldo_object_alloc(under); + if (below != NULL) { + lu_object_add(o, below); + return 0; + } else + return -ENOMEM; } void mdt_object_free(struct lu_object *o) { - struct lu_object_header *h; + struct lu_object_header *h; - h = o->lo_header; - lu_object_fini(o); - lu_object_header_fini(h); + h = o->lo_header; + lu_object_fini(o); + lu_object_header_fini(h); } void mdt_object_release(struct lu_object *o) @@ -722,15 +806,15 @@ void mdt_object_release(struct lu_object *o) int mdt_object_print(struct seq_file *f, const struct lu_object *o) { - return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o); + return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o); } static struct lu_device_operations mdt_lu_ops = { - .ldo_object_alloc = mdt_object_alloc, - .ldo_object_init = mdt_object_init, - .ldo_object_free = mdt_object_free, - .ldo_object_release = mdt_object_release, - .ldo_object_print = mdt_object_print + .ldo_object_alloc = mdt_object_alloc, + .ldo_object_init = mdt_object_init, + .ldo_object_free = mdt_object_free, + .ldo_object_release = mdt_object_release, + .ldo_object_print = mdt_object_print }; struct md_object *mdt_object_child(struct mdt_object *o) @@ -746,16 +830,16 @@ static inline struct md_device_operations *mdt_child_ops(struct mdt_device *d) int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d, struct ll_fid *pfid, const char *name, struct ll_fid *cfid) { - struct mdt_object *o; - struct mdt_object *child; + struct mdt_object *o; + struct mdt_object *child; struct mdt_lock_handle *lh; - int result; + int result; (lh = &info->mti_lh[MDT_LH_PARENT])->mlh_mode = LCK_PW; - o = mdt_object_find_lock(d, pfid, lh); - if (IS_ERR(o)) + o = mdt_object_find_lock(d, pfid, lh); + if (IS_ERR(o)) return PTR_ERR(o); child = mdt_object_find(d, cfid); @@ -767,7 +851,7 @@ int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d, result = PTR_ERR(child); mdt_object_unlock(o, lh); mdt_object_put(o); - return result; + return result; } static struct obd_ops mdt_obd_device_ops = { @@ -852,7 +936,7 @@ static int __init mdt_mod_init(void) if (result != 0) class_unregister_type(LUSTRE_MDT0_NAME); } - return result; + return result; } static void __exit mdt_mod_exit(void) @@ -861,35 +945,35 @@ static void __exit mdt_mod_exit(void) } -#define DEF_HNDL(prefix, base, flags, opc, fn) \ -[prefix ## _ ## opc - prefix ## _ ## base] = { \ - .mh_name = #opc, \ - .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \ - .mh_opc = prefix ## _ ## opc, \ - .mh_flags = flags, \ - .mh_act = fn \ +#define DEF_HNDL(prefix, base, flags, opc, fn) \ +[prefix ## _ ## opc - prefix ## _ ## base] = { \ + .mh_name = #opc, \ + .mh_fail_id = OBD_FAIL_ ## prefix ## _ ## opc ## _NET, \ + .mh_opc = prefix ## _ ## opc, \ + .mh_flags = flags, \ + .mh_act = fn \ } #define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(MDS, GETATTR, flags, name, fn) static struct mdt_handler mdt_mds_ops[] = { - DEF_MDT_HNDL(0, CONNECT, mdt_connect), - DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect), - DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus), - DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr), - DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name), - DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr), - DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr), - DEF_MDT_HNDL(0, STATFS, mdt_statfs), - DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage), - DEF_MDT_HNDL(0, REINT, mdt_reint), - DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close), - DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing), - DEF_MDT_HNDL(0, PIN, mdt_pin), - DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync), - DEF_MDT_HNDL(0, SET_INFO, mdt_set_info), - DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck), - DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl), + DEF_MDT_HNDL(0, CONNECT, mdt_connect), + DEF_MDT_HNDL(0, DISCONNECT, mdt_disconnect), + DEF_MDT_HNDL(0, GETSTATUS, mdt_getstatus), + DEF_MDT_HNDL(HABEO_CORPUS, GETATTR, mdt_getattr), + DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME, mdt_getattr_name), + DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR, mdt_setxattr), + DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR, mdt_getxattr), + DEF_MDT_HNDL(0, STATFS, mdt_statfs), + DEF_MDT_HNDL(HABEO_CORPUS, READPAGE, mdt_readpage), + DEF_MDT_HNDL(0, REINT, mdt_reint), + DEF_MDT_HNDL(HABEO_CORPUS, CLOSE, mdt_close), + DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING, mdt_done_writing), + DEF_MDT_HNDL(0, PIN, mdt_pin), + DEF_MDT_HNDL(HABEO_CORPUS, SYNC, mdt_sync), + DEF_MDT_HNDL(0, SET_INFO, mdt_set_info), + DEF_MDT_HNDL(0, QUOTACHECK, mdt_handle_quotacheck), + DEF_MDT_HNDL(0, QUOTACTL, mdt_handle_quotactl), }; static struct mdt_handler mdt_obd_ops[] = { @@ -902,28 +986,28 @@ static struct mdt_handler mdt_llog_ops[] = { }; static struct mdt_opc_slice mdt_handlers[] = { - { - .mos_opc_start = MDS_GETATTR, - .mos_opc_end = MDS_LAST_OPC, - .mos_hs = mdt_mds_ops - }, - { - .mos_opc_start = OBD_PING, - .mos_opc_end = OBD_LAST_OPC, - .mos_hs = mdt_obd_ops - }, - { - .mos_opc_start = LDLM_ENQUEUE, - .mos_opc_end = LDLM_LAST_OPC, - .mos_hs = mdt_dlm_ops - }, - { - .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE, - .mos_opc_end = LLOG_LAST_OPC, - .mos_hs = mdt_llog_ops - }, { - .mos_hs = NULL + .mos_opc_start = MDS_GETATTR, + .mos_opc_end = MDS_LAST_OPC, + .mos_hs = mdt_mds_ops + }, + { + .mos_opc_start = OBD_PING, + .mos_opc_end = OBD_LAST_OPC, + .mos_hs = mdt_obd_ops + }, + { + .mos_opc_start = LDLM_ENQUEUE, + .mos_opc_end = LDLM_LAST_OPC, + .mos_hs = mdt_dlm_ops + }, + { + .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE, + .mos_opc_end = LLOG_LAST_OPC, + .mos_hs = mdt_llog_ops + }, + { + .mos_hs = NULL } }; -- 1.8.3.1