From 929ec628e6fef5609e55d519a1eb9e2cbbf1f1e8 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Thu, 6 Dec 2012 09:03:27 -0800 Subject: [PATCH] LU-1710 lvb: variable sized LVB support Originally, the LVB on wire is fixed sized only for transferring object {m/a/c}time and block/size information. But with introducing layout lock, new quota and nanosecond features in the near futures, such fixed sized on wire LVB structure cannot match the requirements from these new features. So change on wire protocol to allow variable sized LVB transerred between nodes. Signed-off-by: Fan Yong Signed-off-by: Jinshan Xiong Change-Id: Ifcd2d303495d2b3d1c09c38af823d246fff196fa Reviewed-on: http://review.whamcloud.com/3965 Tested-by: Hudson Reviewed-by: Johann Lombardi Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_idl.h | 44 ++++++++----- lustre/include/lustre_dlm.h | 14 ++++- lustre/include/lustre_export.h | 21 +++++++ lustre/include/lustre_req_layout.h | 7 ++- lustre/ldlm/ldlm_flock.c | 6 +- lustre/ldlm/ldlm_internal.h | 4 +- lustre/ldlm/ldlm_lock.c | 121 ++++++++++++++++++++++++++++++++--- lustre/ldlm/ldlm_lockd.c | 88 +++++++++++++------------- lustre/ldlm/ldlm_request.c | 125 +++++++++++++++++++------------------ lustre/liblustre/rw.c | 8 ++- lustre/liblustre/super.c | 11 ++-- lustre/llite/llite_lib.c | 4 +- lustre/mdc/mdc_locks.c | 31 +++++---- lustre/mdt/mdt_handler.c | 10 +++ lustre/mdt/mdt_internal.h | 2 +- lustre/mdt/mdt_reint.c | 6 +- lustre/mgc/mgc_request.c | 4 +- lustre/mgs/mgs_handler.c | 3 +- lustre/obdclass/obd_mount.c | 4 +- lustre/obdecho/echo.c | 4 +- lustre/obdecho/echo_client.c | 2 +- lustre/ofd/ofd_dlm.c | 1 + lustre/ofd/ofd_internal.h | 1 + lustre/ofd/ofd_lvb.c | 47 ++++++++++---- lustre/ofd/ofd_obd.c | 2 +- lustre/osc/osc_lock.c | 5 ++ lustre/osc/osc_request.c | 2 +- lustre/osp/osp_dev.c | 3 +- lustre/ost/ost_handler.c | 6 +- lustre/ptlrpc/layout.c | 16 +++-- lustre/ptlrpc/pack_generic.c | 43 +++++++++---- lustre/ptlrpc/wiretest.c | 46 +++++++++++++- lustre/quota/qmt_lock.c | 7 ++- lustre/quota/qsd_handler.c | 12 ++-- lustre/quota/qsd_internal.h | 6 +- lustre/quota/qsd_lock.c | 2 + lustre/quota/qsd_reint.c | 10 +-- lustre/quota/qsd_request.c | 12 ++-- lustre/utils/req-layout.c | 3 +- lustre/utils/wirecheck.c | 37 ++++++++--- lustre/utils/wiretest.c | 46 +++++++++++++- 41 files changed, 587 insertions(+), 239 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 394decc..2340e58 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1209,7 +1209,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_SOM | OBD_CONNECT_FULL20 | \ OBD_CONNECT_64BITHASH | OBD_CONNECT_JOBSTATS | \ OBD_CONNECT_EINPROGRESS | \ - OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_UMASK) + OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_UMASK | \ + OBD_CONNECT_LVB_TYPE) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \ @@ -1223,7 +1224,8 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_64BITHASH | OBD_CONNECT_MAXBYTES | \ OBD_CONNECT_MAX_EASIZE | \ OBD_CONNECT_EINPROGRESS | \ - OBD_CONNECT_JOBSTATS | OBD_CONNECT_LIGHTWEIGHT) + OBD_CONNECT_JOBSTATS | \ + OBD_CONNECT_LIGHTWEIGHT | OBD_CONNECT_LVB_TYPE) #define ECHO_CONNECT_SUPPORTED (0) #define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT | \ OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV | \ @@ -1592,14 +1594,30 @@ extern void lustre_swab_niobuf_remote (struct niobuf_remote *nbr); do { blocks = OST_LVB_ERR_INIT + rc; } while (0) #define OST_LVB_GET_ERR(blocks) (int)(blocks - OST_LVB_ERR_INIT) +struct ost_lvb_v1 { + __u64 lvb_size; + obd_time lvb_mtime; + obd_time lvb_atime; + obd_time lvb_ctime; + __u64 lvb_blocks; +}; + +extern void lustre_swab_ost_lvb_v1(struct ost_lvb_v1 *lvb); + struct ost_lvb { - __u64 lvb_size; - obd_time lvb_mtime; - obd_time lvb_atime; - obd_time lvb_ctime; - __u64 lvb_blocks; + __u64 lvb_size; + obd_time lvb_mtime; + obd_time lvb_atime; + obd_time lvb_ctime; + __u64 lvb_blocks; + __u32 lvb_mtime_ns; + __u32 lvb_atime_ns; + __u32 lvb_ctime_ns; + __u32 lvb_padding; }; +extern void lustre_swab_ost_lvb(struct ost_lvb *lvb); + /* * lquota data structures */ @@ -1766,6 +1784,8 @@ struct lquota_lvb { __u64 lvb_pad1; }; +extern void lustre_swab_lquota_lvb(struct lquota_lvb *lvb); + /* LVB used with global quota lock */ #define lvb_glb_ver lvb_id_may_rel /* current version of the global index */ @@ -2495,16 +2515,6 @@ typedef union { extern void lustre_swab_ldlm_policy_data (ldlm_wire_policy_data_t *d); -/* Similarly to ldlm_wire_policy_data_t, there is one common swabber for all - * LVB types. As a result, any new LVB structure must match the fields of the - * ost_lvb structure. */ -union ldlm_wire_lvb { - struct ost_lvb l_ost; - struct lquota_lvb l_lquota; -}; - -extern void lustre_swab_lvb(union ldlm_wire_lvb *); - union ldlm_gl_desc { struct ldlm_gl_lquota_desc lquota_desc; }; diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 3a19073..908c693 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -663,6 +663,13 @@ void ldlm_convert_policy_to_local(struct obd_export *exp, ldlm_type_t type, const ldlm_wire_policy_data_t *wpolicy, ldlm_policy_data_t *lpolicy); +enum lvb_type { + LVB_T_NONE = 0, + LVB_T_OST = 1, + LVB_T_LQUOTA = 2, + LVB_T_LAYOUT = 3, +}; + struct ldlm_lock { /** * Must be first in the structure. @@ -802,6 +809,7 @@ struct ldlm_lock { * Client-side-only members. */ + enum lvb_type l_lvb_type; /** * Temporary storage for an LVB received during an enqueue operation. */ @@ -1273,8 +1281,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, struct ldlm_enqueue_info *einfo, const struct ldlm_res_id *res_id, ldlm_policy_data_t const *policy, __u64 *flags, - void *lvb, __u32 lvb_len, struct lustre_handle *lockh, - int async); + void *lvb, __u32 lvb_len, enum lvb_type lvb_type, + struct lustre_handle *lockh, int async); int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req, cfs_list_t *cancels, @@ -1294,7 +1302,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_blocking_callback blocking, ldlm_completion_callback completion, ldlm_glimpse_callback glimpse, - void *data, __u32 lvb_len, + void *data, __u32 lvb_len, enum lvb_type lvb_type, const __u64 *client_cookie, struct lustre_handle *lockh); int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 3c08e6d..377f466 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -330,6 +330,27 @@ static inline int imp_connect_lru_resize(struct obd_import *imp) return !!(ocd->ocd_connect_flags & OBD_CONNECT_LRU_RESIZE); } +static inline bool exp_connect_lvb_type(struct obd_export *exp) +{ + LASSERT(exp != NULL); + if (exp->exp_connect_flags & OBD_CONNECT_LVB_TYPE) + return true; + else + return false; +} + +static inline bool imp_connect_lvb_type(struct obd_import *imp) +{ + struct obd_connect_data *ocd; + + LASSERT(imp != NULL); + ocd = &imp->imp_connect_data; + if (ocd->ocd_connect_flags & OBD_CONNECT_LVB_TYPE) + return true; + else + return false; +} + extern struct obd_export *class_conn2export(struct lustre_handle *conn); extern struct obd_device *class_conn2obd(struct lustre_handle *conn); diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index e46b903..c1c3c90 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -89,8 +89,8 @@ int req_capsule_server_pack(struct req_capsule *pill); void *req_capsule_client_get(struct req_capsule *pill, const struct req_msg_field *field); void *req_capsule_client_swab_get(struct req_capsule *pill, - const struct req_msg_field *field, - void (*swabber)(void*)); + const struct req_msg_field *field, + void *swabber); void *req_capsule_client_sized_get(struct req_capsule *pill, const struct req_msg_field *field, int len); @@ -102,6 +102,9 @@ void *req_capsule_server_sized_get(struct req_capsule *pill, void *req_capsule_server_swab_get(struct req_capsule *pill, const struct req_msg_field *field, void *swabber); +void *req_capsule_server_sized_swab_get(struct req_capsule *pill, + const struct req_msg_field *field, + int len, void *swabber); const void *req_capsule_other_get(struct req_capsule *pill, const struct req_msg_field *field); diff --git a/lustre/ldlm/ldlm_flock.c b/lustre/ldlm/ldlm_flock.c index f61e300..795cbac 100644 --- a/lustre/ldlm/ldlm_flock.c +++ b/lustre/ldlm/ldlm_flock.c @@ -417,9 +417,9 @@ reprocess: * and restart processing this lock. */ if (!new2) { unlock_res_and_lock(req); - new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK, - lock->l_granted_mode, &null_cbs, - NULL, 0); + new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK, + lock->l_granted_mode, &null_cbs, + NULL, 0, LVB_T_NONE); lock_res_and_lock(req); if (!new2) { ldlm_flock_destroy(req, lock->l_granted_mode, diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index f0a114a..70db50f 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -110,11 +110,13 @@ typedef enum { } ldlm_desc_ast_t; void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list); +int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill, + enum req_location loc, void *data, int size); struct ldlm_lock * ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *, ldlm_type_t type, ldlm_mode_t, const struct ldlm_callback_suite *cbs, - void *data, __u32 lvb_len); + void *data, __u32 lvb_len, enum lvb_type lvb_type); ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *, struct ldlm_lock **, void *cookie, __u64 *flags); void ldlm_lock_addref_internal(struct ldlm_lock *, __u32 mode); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 559bef6..fc7e89d 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1331,13 +1331,112 @@ out: } EXPORT_SYMBOL(ldlm_revalidate_lock_handle); +/* The caller's duty to guarantee the buffer is large enough. */ +int ldlm_fill_lvb(struct ldlm_lock *lock, struct req_capsule *pill, + enum req_location loc, void *data, int size) +{ + void *lvb; + ENTRY; + + LASSERT(data != NULL); + LASSERT(size >= 0); + + switch (lock->l_lvb_type) { + case LVB_T_OST: + if (size == sizeof(struct ost_lvb)) { + if (loc == RCL_CLIENT) + lvb = req_capsule_client_swab_get(pill, + &RMF_DLM_LVB, + lustre_swab_ost_lvb); + else + lvb = req_capsule_server_swab_get(pill, + &RMF_DLM_LVB, + lustre_swab_ost_lvb); + if (unlikely(lvb == NULL)) { + LDLM_ERROR(lock, "no LVB"); + RETURN(-EPROTO); + } + + memcpy(data, lvb, size); + } else if (size == sizeof(struct ost_lvb_v1)) { + struct ost_lvb *olvb = data; + + if (loc == RCL_CLIENT) + lvb = req_capsule_client_swab_get(pill, + &RMF_DLM_LVB, + lustre_swab_ost_lvb_v1); + else + lvb = req_capsule_server_sized_swab_get(pill, + &RMF_DLM_LVB, size, + lustre_swab_ost_lvb_v1); + if (unlikely(lvb == NULL)) { + LDLM_ERROR(lock, "no LVB"); + RETURN(-EPROTO); + } + + memcpy(data, lvb, size); + olvb->lvb_mtime_ns = 0; + olvb->lvb_atime_ns = 0; + olvb->lvb_ctime_ns = 0; + } else { + LDLM_ERROR(lock, "Replied unexpected ost LVB size %d", + size); + RETURN(-EINVAL); + } + break; + case LVB_T_LQUOTA: + if (size == sizeof(struct lquota_lvb)) { + if (loc == RCL_CLIENT) + lvb = req_capsule_client_swab_get(pill, + &RMF_DLM_LVB, + lustre_swab_lquota_lvb); + else + lvb = req_capsule_server_swab_get(pill, + &RMF_DLM_LVB, + lustre_swab_lquota_lvb); + if (unlikely(lvb == NULL)) { + LDLM_ERROR(lock, "no LVB"); + RETURN(-EPROTO); + } + + memcpy(data, lvb, size); + } else { + LDLM_ERROR(lock, "Replied unexpected lquota LVB size %d", + size); + RETURN(-EINVAL); + } + break; + case LVB_T_LAYOUT: + if (size == 0) + break; + + if (loc == RCL_CLIENT) + lvb = req_capsule_client_get(pill, &RMF_DLM_LVB); + else + lvb = req_capsule_server_get(pill, &RMF_DLM_LVB); + if (unlikely(lvb == NULL)) { + LDLM_ERROR(lock, "no LVB"); + RETURN(-EPROTO); + } + + memcpy(data, lvb, size); + break; + default: + LDLM_ERROR(lock, "Unexpected LVB type"); + RETURN(-EINVAL); + } + + RETURN(0); +} + /* Returns a referenced lock */ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *res_id, ldlm_type_t type, ldlm_mode_t mode, const struct ldlm_callback_suite *cbs, - void *data, __u32 lvb_len) + void *data, __u32 lvb_len, + enum lvb_type lvb_type) { struct ldlm_lock *lock; struct ldlm_resource *res; @@ -1377,6 +1476,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns, GOTO(out, 0); } + lock->l_lvb_type = lvb_type; if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_NEW_LOCK)) GOTO(out, 0); @@ -2129,7 +2229,8 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, libcfs_debug_vmsg2(msgdata, fmt, args, " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s " - "remote: "LPX64" expref: %d pid: %u timeout: %lu\n", + "remote: "LPX64" expref: %d pid: %u timeout: %lu " + "lvb_type: %d\n", lock, lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, @@ -2137,7 +2238,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, ldlm_lockname[lock->l_req_mode], lock->l_flags, nid, lock->l_remote_handle.cookie, exp ? cfs_atomic_read(&exp->exp_refcount) : -99, - lock->l_pid, lock->l_callback_timeout); + lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type); va_end(args); return; } @@ -2148,7 +2249,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64 "] (req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote:" - " "LPX64" expref: %d pid: %u timeout %lu\n", + " "LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n", ldlm_lock_to_ns_name(lock), lock, lock->l_handle.h_cookie, cfs_atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers, @@ -2163,7 +2264,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, lock->l_req_extent.start, lock->l_req_extent.end, lock->l_flags, nid, lock->l_remote_handle.cookie, exp ? cfs_atomic_read(&exp->exp_refcount) : -99, - lock->l_pid, lock->l_callback_timeout); + lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type); break; case LDLM_FLOCK: @@ -2194,7 +2295,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s " "flags: "LPX64" nid: %s remote: "LPX64" expref: %d " - "pid: %u timeout: %lu\n", + "pid: %u timeout: %lu lvb_type: %d\n", ldlm_lock_to_ns_name(lock), lock, lock->l_handle.h_cookie, cfs_atomic_read (&lock->l_refc), @@ -2208,15 +2309,15 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, ldlm_typename[resource->lr_type], lock->l_flags, nid, lock->l_remote_handle.cookie, exp ? cfs_atomic_read(&exp->exp_refcount) : -99, - lock->l_pid, lock->l_callback_timeout); + lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type); break; default: libcfs_debug_vmsg2(msgdata, fmt, args, " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s " "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" " - "nid: %s remote: "LPX64" expref: %d pid: %u timeout %lu" - "\n", + "nid: %s remote: "LPX64" expref: %d pid: %u timeout: %lu" + "lvb_type: %d\n", ldlm_lock_to_ns_name(lock), lock, lock->l_handle.h_cookie, cfs_atomic_read (&lock->l_refc), @@ -2229,7 +2330,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, ldlm_typename[resource->lr_type], lock->l_flags, nid, lock->l_remote_handle.cookie, exp ? cfs_atomic_read(&exp->exp_refcount) : -99, - lock->l_pid, lock->l_callback_timeout); + lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type); break; } va_end(args); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 346a6e7..7a32e1d 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -918,10 +918,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) /* server namespace, doesn't need lock */ lvb_len = ldlm_lvbo_size(lock); - if (lvb_len > 0) - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, - lvb_len); - + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT, lvb_len); rc = ptlrpc_request_pack(req, LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK); if (rc) { ptlrpc_request_free(req); @@ -1235,8 +1232,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns, lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name, dlm_req->lock_desc.l_resource.lr_type, dlm_req->lock_desc.l_req_mode, - cbs, NULL, 0); - + cbs, NULL, 0, LVB_T_NONE); if (!lock) GOTO(out, rc = -ENOMEM); @@ -1636,6 +1632,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, { int lvb_len; CFS_LIST_HEAD(ast_list); + int rc = 0; ENTRY; LDLM_DEBUG(lock, "client completion callback handler START"); @@ -1652,28 +1649,34 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT); - if (lvb_len > 0) { + if (lvb_len < 0) { + LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len); + GOTO(out, rc = lvb_len); + } else if (lvb_len > 0) { if (lock->l_lvb_len > 0) { /* for extent lock, lvb contains ost_lvb{}. */ LASSERT(lock->l_lvb_data != NULL); - LASSERTF(lock->l_lvb_len == lvb_len, - "preallocated %d, actual %d.\n", - lock->l_lvb_len, lvb_len); + + if (unlikely(lock->l_lvb_len < lvb_len)) { + LDLM_ERROR(lock, "Replied LVB is larger than " + "expectation, expected = %d, " + "replied = %d", + lock->l_lvb_len, lvb_len); + GOTO(out, rc = -EINVAL); + } } else { /* for layout lock, lvb has variable length */ void *lvb_data; OBD_ALLOC(lvb_data, lvb_len); - if (lvb_data == NULL) - LDLM_ERROR(lock, "no memory.\n"); - - lock_res_and_lock(lock); if (lvb_data == NULL) { - lock->l_flags |= LDLM_FL_FAILED; - } else { - LASSERT(lock->l_lvb_data == NULL); - lock->l_lvb_data = lvb_data; - lock->l_lvb_len = lvb_len; + LDLM_ERROR(lock, "No memory.\n"); + GOTO(out, rc = -ENOMEM); } + + lock_res_and_lock(lock); + LASSERT(lock->l_lvb_data == NULL); + lock->l_lvb_data = lvb_data; + lock->l_lvb_len = lvb_len; unlock_res_and_lock(lock); } } @@ -1684,9 +1687,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, /* bug 11300: the lock has already been granted */ unlock_res_and_lock(lock); LDLM_DEBUG(lock, "Double grant race happened"); - LDLM_LOCK_RELEASE(lock); - EXIT; - return; + GOTO(out, rc = 0); } /* If we receive the completion AST before the actual enqueue returned, @@ -1709,13 +1710,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, &lock->l_resource->lr_name, sizeof(lock->l_resource->lr_name)) != 0) { unlock_res_and_lock(lock); - if (ldlm_lock_change_resource(ns, lock, - &dlm_req->lock_desc.l_resource.lr_name) != 0) { - LDLM_ERROR(lock, "Failed to allocate resource"); - LDLM_LOCK_RELEASE(lock); - EXIT; - return; - } + rc = ldlm_lock_change_resource(ns, lock, + &dlm_req->lock_desc.l_resource.lr_name); + if (rc < 0) { + LDLM_ERROR(lock, "Failed to allocate resource"); + GOTO(out, rc); + } LDLM_DEBUG(lock, "completion AST, new resource"); CERROR("change resource!\n"); lock_res_and_lock(lock); @@ -1729,17 +1729,14 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG(lock, "completion AST includes blocking AST"); } - if (lock->l_lvb_len) { - if (req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, - RCL_CLIENT) < lock->l_lvb_len) { - LDLM_ERROR(lock, "completion AST did not contain " - "expected LVB!"); - } else { - void *lvb = req_capsule_client_get(&req->rq_pill, - &RMF_DLM_LVB); - memcpy(lock->l_lvb_data, lvb, lock->l_lvb_len); - } - } + if (lock->l_lvb_len > 0) { + rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT, + lock->l_lvb_data, lvb_len); + if (rc < 0) { + unlock_res_and_lock(lock); + GOTO(out, rc); + } + } ldlm_grant_lock(lock, &ast_list); unlock_res_and_lock(lock); @@ -1754,8 +1751,15 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)", lock); - LDLM_LOCK_RELEASE(lock); - EXIT; + GOTO(out, rc); + +out: + if (rc < 0) { + lock_res_and_lock(lock); + lock->l_flags |= LDLM_FL_FAILED; + unlock_res_and_lock(lock); + } + LDLM_LOCK_RELEASE(lock); } static void ldlm_handle_gl_callback(struct ptlrpc_request *req, diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 36bdc5a2..c5ea677 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -385,7 +385,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, ldlm_blocking_callback blocking, ldlm_completion_callback completion, ldlm_glimpse_callback glimpse, - void *data, __u32 lvb_len, + void *data, __u32 lvb_len, enum lvb_type lvb_type, const __u64 *client_cookie, struct lustre_handle *lockh) { @@ -403,7 +403,8 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, LBUG(); } - lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len); + lock = ldlm_lock_create(ns, res_id, type, mode, &cbs, data, lvb_len, + lvb_type); if (unlikely(!lock)) GOTO(out_nolock, err = -ENOMEM); @@ -493,8 +494,8 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, int is_replay = *flags & LDLM_FL_REPLAY; struct ldlm_lock *lock; struct ldlm_reply *reply; - struct ost_lvb *tmplvb; int cleanup_phase = 1; + int size = 0; ENTRY; lock = ldlm_handle2lock(lockh); @@ -504,35 +505,45 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, RETURN(-ENOLCK); } + LASSERTF(ergo(lvb_len != 0, lvb_len == lock->l_lvb_len), + "lvb_len = %d, l_lvb_len = %d\n", lvb_len, lock->l_lvb_len); + if (rc != ELDLM_OK) { LASSERT(!is_replay); LDLM_DEBUG(lock, "client-side enqueue END (%s)", rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED"); - if (rc == ELDLM_LOCK_ABORTED) { - /* Before we return, swab the reply */ - reply = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_REP); - if (reply == NULL) - rc = -EPROTO; - if (lvb_len) { - - req_capsule_set_size(&req->rq_pill, - &RMF_DLM_LVB, RCL_SERVER, - lvb_len); - tmplvb = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_LVB); - if (tmplvb == NULL) - GOTO(cleanup, rc = -EPROTO); - if (lvb != NULL) - memcpy(lvb, tmplvb, lvb_len); - } - } - GOTO(cleanup, rc); - } - reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); - if (reply == NULL) - GOTO(cleanup, rc = -EPROTO); + if (rc != ELDLM_LOCK_ABORTED) + GOTO(cleanup, rc); + } + + /* Before we return, swab the reply */ + reply = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP); + if (reply == NULL) + GOTO(cleanup, rc = -EPROTO); + + if (lvb_len != 0) { + LASSERT(lvb != NULL); + + size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, + RCL_SERVER); + if (size < 0) { + LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", size); + GOTO(cleanup, rc = size); + } else if (unlikely(size > lvb_len)) { + LDLM_ERROR(lock, "Replied LVB is larger than " + "expectation, expected = %d, replied = %d", + lvb_len, size); + GOTO(cleanup, rc = -EINVAL); + } + } + + if (rc == ELDLM_LOCK_ABORTED) { + if (lvb_len != 0) + rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER, + lvb, size); + GOTO(cleanup, rc = (rc != 0 ? rc : ELDLM_LOCK_ABORTED)); + } /* lock enqueued on the server */ cleanup_phase = 0; @@ -619,25 +630,18 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, /* If the lock has already been granted by a completion AST, don't * clobber the LVB with an older one. */ - if (lvb_len) { - /* We must lock or a racing completion might update lvb - without letting us know and we'll clobber the correct value. - Cannot unlock after the check either, a that still leaves - a tiny window for completion to get in */ - lock_res_and_lock(lock); - if (lock->l_req_mode != lock->l_granted_mode) { - - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, - RCL_SERVER, lvb_len); - tmplvb = req_capsule_server_get(&req->rq_pill, - &RMF_DLM_LVB); - if (tmplvb == NULL) { - unlock_res_and_lock(lock); - GOTO(cleanup, rc = -EPROTO); - } - memcpy(lock->l_lvb_data, tmplvb, lvb_len); - } - unlock_res_and_lock(lock); + if (lvb_len != 0) { + /* We must lock or a racing completion might update lvb without + * letting us know and we'll clobber the correct value. + * Cannot unlock after the check either, a that still leaves + * a tiny window for completion to get in */ + lock_res_and_lock(lock); + if (lock->l_req_mode != lock->l_granted_mode) + rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER, + lock->l_lvb_data, size); + unlock_res_and_lock(lock); + if (rc < 0) + GOTO(cleanup, rc); } if (!is_replay) { @@ -787,8 +791,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, struct ldlm_enqueue_info *einfo, const struct ldlm_res_id *res_id, ldlm_policy_data_t const *policy, __u64 *flags, - void *lvb, __u32 lvb_len, struct lustre_handle *lockh, - int async) + void *lvb, __u32 lvb_len, enum lvb_type lvb_type, + struct lustre_handle *lockh, int async) { struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct ldlm_lock *lock; @@ -817,7 +821,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, }; lock = ldlm_lock_create(ns, res_id, einfo->ei_type, einfo->ei_mode, &cbs, einfo->ei_cbdata, - lvb_len); + lvb_len, lvb_type); if (lock == NULL) RETURN(-ENOMEM); /* for the local lock, add the reference */ @@ -882,13 +886,12 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, /* Continue as normal. */ if (!req_passed_in) { - if (lvb_len > 0) { - req_capsule_extend(&req->rq_pill, - &RQF_LDLM_ENQUEUE_LVB); - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, - RCL_SERVER, lvb_len); - } - ptlrpc_request_set_replen(req); + if (lvb_len > 0) + req_capsule_extend(&req->rq_pill, + &RQF_LDLM_ENQUEUE_LVB); + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + lvb_len); + ptlrpc_request_set_replen(req); } /* @@ -2113,6 +2116,7 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) ldlm_lock_cancel(lock); RETURN(0); } + /* * If granted mode matches the requested mode, this lock is granted. * @@ -2149,11 +2153,10 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock) body->lock_flags = ldlm_flags_to_wire(flags); ldlm_lock2handle(lock, &body->lock_handle[0]); - if (lock->l_lvb_len != 0) { - req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB); - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, - lock->l_lvb_len); - } + if (lock->l_lvb_len > 0) + req_capsule_extend(&req->rq_pill, &RQF_LDLM_ENQUEUE_LVB); + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + lock->l_lvb_len); ptlrpc_request_set_replen(req); /* notify the server we've replayed all requests. * also, we mark the request to be put on a dedicated diff --git a/lustre/liblustre/rw.c b/lustre/liblustre/rw.c index 4991ca9..7a87c39 100644 --- a/lustre/liblustre/rw.c +++ b/lustre/liblustre/rw.c @@ -166,8 +166,12 @@ static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp) GOTO(iput, rc = -ELDLM_NO_LOCK_DATA); req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK); - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, - sizeof(*lvb)); + if (exp_connect_lvb_type(req->rq_export)) + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(*lvb)); + else + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(struct ost_lvb_v1)); rc = req_capsule_server_pack(&req->rq_pill); if (rc) { CERROR("failed pack reply: %d\n", rc); diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 81808cb..7511fa0 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -1346,8 +1346,9 @@ static int llu_file_flock(struct inode *ino, RETURN(rc = -ENODEV); if (lmv->tgts[0].ltd_exp != NULL) - rc = ldlm_cli_enqueue(lmv->tgts[0].ltd_exp, NULL, &einfo, &res_id, - &flock, &flags, NULL, 0, &lockh, 0); + rc = ldlm_cli_enqueue(lmv->tgts[0].ltd_exp, NULL, + &einfo, &res_id, &flock, &flags, + NULL, 0, LVB_T_NONE, &lockh, 0); else rc = -ENODEV; } @@ -1924,7 +1925,8 @@ llu_fsswop_mount(const char *source, ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION | OBD_CONNECT_FID | OBD_CONNECT_AT | - OBD_CONNECT_VBR | OBD_CONNECT_FULL20; + OBD_CONNECT_VBR | OBD_CONNECT_FULL20 | + OBD_CONNECT_LVB_TYPE; #ifdef LIBLUSTRE_POSIX_ACL ocd.ocd_connect_flags |= OBD_CONNECT_ACL; @@ -1962,7 +1964,8 @@ llu_fsswop_mount(const char *source, ocd.ocd_connect_flags = OBD_CONNECT_SRVLOCK | OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_FID | OBD_CONNECT_AT | - OBD_CONNECT_FULL20 | OBD_CONNECT_EINPROGRESS; + OBD_CONNECT_FULL20 | OBD_CONNECT_EINPROGRESS | + OBD_CONNECT_LVB_TYPE; ocd.ocd_version = LUSTRE_VERSION_CODE; err = obd_connect(NULL, &sbi->ll_dt_exp, obd, &sbi->ll_sb_uuid, &ocd, NULL); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index d805a0d..f54393f 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -221,7 +221,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_RMT_CLIENT | OBD_CONNECT_VBR | OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH| OBD_CONNECT_EINPROGRESS | - OBD_CONNECT_JOBSTATS; + OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE; if (sbi->ll_flags & LL_SBI_SOM_PREVIEW) data->ocd_connect_flags |= OBD_CONNECT_SOM; @@ -404,7 +404,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_FULL20 | OBD_CONNECT_64BITHASH | OBD_CONNECT_MAXBYTES | OBD_CONNECT_EINPROGRESS | - OBD_CONNECT_JOBSTATS; + OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE; if (sbi->ll_flags & LL_SBI_SOM_PREVIEW) data->ocd_connect_flags |= OBD_CONNECT_SOM; diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index 5226a05..13e45fd 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -437,7 +437,8 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, RETURN(req); } -static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp) +static struct ptlrpc_request * +mdc_enqueue_pack(struct obd_export *exp, int lvb_len) { struct ptlrpc_request *req; int rc; @@ -453,6 +454,7 @@ static struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp) RETURN(ERR_PTR(rc)); } + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); ptlrpc_request_set_replen(req); RETURN(req); } @@ -635,8 +637,8 @@ static int mdc_finish_enqueue(struct obd_export *exp, void *lvb; void *lmm; - lvb = req_capsule_server_get(pill, - &RMF_DLM_LVB); + lvb = req_capsule_server_sized_get(pill, + &RMF_DLM_LVB, lvb_len); if (lvb == NULL) { LDLM_LOCK_PUT(lock); RETURN(-EPROTO); @@ -685,6 +687,7 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, ldlm_policy_data_t const *policy = &lookup_policy; int generation, resends = 0; struct ldlm_reply *lockrep; + enum lvb_type lvb_type = 0; ENTRY; LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n", @@ -719,13 +722,19 @@ resend: policy = &update_policy; einfo->ei_cbdata = NULL; lmm = NULL; - } else if (it->it_op & IT_UNLINK) - req = mdc_intent_unlink_pack(exp, it, op_data); - else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) + } else if (it->it_op & IT_UNLINK) { + req = mdc_intent_unlink_pack(exp, it, op_data); + } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) { req = mdc_intent_getattr_pack(exp, it, op_data); - else if (it->it_op & (IT_READDIR | IT_LAYOUT)) - req = ldlm_enqueue_pack(exp); - else { + } else if (it->it_op & IT_READDIR) { + req = mdc_enqueue_pack(exp, 0); + } else if (it->it_op & IT_LAYOUT) { + if (!imp_connect_lvb_type(class_exp2cliimp(exp))) + RETURN(-EOPNOTSUPP); + + req = mdc_enqueue_pack(exp, obddev->u.cli.cl_max_mds_easize); + lvb_type = LVB_T_LAYOUT; + } else { LBUG(); RETURN(-EINVAL); } @@ -759,7 +768,7 @@ resend: } rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL, - 0, lockh, 0); + 0, lvb_type, lockh, 0); if (!it) { /* For flock requests we immediatelly return without further delay and let caller deal with the rest, since rest of @@ -1144,7 +1153,7 @@ int mdc_intent_getattr_async(struct obd_export *exp, } rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL, - 0, &minfo->mi_lockh, 1); + 0, LVB_T_NONE, &minfo->mi_lockh, 1); if (rc < 0) { mdc_exit_request(&obddev->u.cli); ptlrpc_req_finished(req); diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index b42e5bf..1c5d473 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -3819,6 +3819,7 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, if (qmt == NULL) RETURN(-EOPNOTSUPP); + (*lockp)->l_lvb_type = LVB_T_LQUOTA; /* pass the request to quota master */ rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt, mdt_info_req(info), lockp, @@ -3826,6 +3827,14 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info, RETURN(rc); } + if (opc == MDT_IT_LAYOUT) { + (*lockp)->l_lvb_type = LVB_T_LAYOUT; + /* XXX: set replay RMF_DLM_LVB as the real EA size when LAYOUT + * lock enabled. */ + } else if (opc == MDT_IT_READDIR) { + req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); + } + flv = &mdt_it_flavor[opc]; if (flv->it_fmt != NULL) req_capsule_extend(pill, flv->it_fmt); @@ -3888,6 +3897,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns, } else { /* No intent was provided */ LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE); + req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0); rc = req_capsule_server_pack(pill); if (rc) rc = err_serious(rc); diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 57d0d77..a40e208 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -734,7 +734,7 @@ static inline int mdt_fid_lock(struct ldlm_namespace *ns, rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, mode, &flags, mdt_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, - client_cookie, lh); + LVB_T_NONE, client_cookie, lh); return rc == ELDLM_OK ? 0 : -EIO; } diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 51ef90e..b5bdac7 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -860,7 +860,8 @@ static int mdt_rename_lock(struct mdt_thread_info *info, rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy, LCK_EX, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, - &info->mti_exp->exp_handle.h_cookie, + LVB_T_NONE, + &info->mti_exp->exp_handle.h_cookie, lh); } else { struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX, @@ -870,7 +871,8 @@ static int mdt_rename_lock(struct mdt_thread_info *info, * other clients. */ rc = ldlm_cli_enqueue(ms->ms_control_exp, NULL, &einfo, res_id, - policy, &flags, NULL, 0, lh, 0); + policy, &flags, NULL, 0, LVB_T_NONE, lh, + 0); } RETURN(rc); diff --git a/lustre/mgc/mgc_request.c b/lustre/mgc/mgc_request.c index e4ed97c..4dd8d72 100644 --- a/lustre/mgc/mgc_request.c +++ b/lustre/mgc/mgc_request.c @@ -927,6 +927,8 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, LDLM_ENQUEUE); if (req == NULL) RETURN(-ENOMEM); + + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 0); ptlrpc_request_set_replen(req); /* check if this is server or client */ @@ -938,7 +940,7 @@ static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm, /* Limit how long we will wait for the enqueue to complete */ req->rq_delay_limit = short_limit ? 5 : MGC_ENQUEUE_LIMIT; rc = ldlm_cli_enqueue(exp, &req, &einfo, &cld->cld_resid, NULL, flags, - NULL, 0, lockh, 0); + NULL, 0, LVB_T_NONE, lockh, 0); /* A failed enqueue should still call the mgc_blocking_ast, where it will be requeued if needed ("grant failed"). */ ptlrpc_req_finished(req); diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index ce6ada3..0b6b40b 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -210,7 +210,8 @@ void mgs_revoke_lock(struct mgs_device *mgs, struct fs_db *fsdb, int type) rc = ldlm_cli_enqueue_local(mgs->mgs_obd->obd_namespace, &res_id, LDLM_PLAIN, NULL, LCK_EX, &flags, ldlm_blocking_ast, cp, - NULL, fsdb, 0, NULL, &lockh); + NULL, fsdb, 0, LVB_T_NONE, NULL, + &lockh); if (rc != ELDLM_OK) { CERROR("can't take cfg lock for "LPX64"/"LPX64"(%d)\n", le64_to_cpu(res_id.name[0]), diff --git a/lustre/obdclass/obd_mount.c b/lustre/obdclass/obd_mount.c index 578492e..fb270ea 100644 --- a/lustre/obdclass/obd_mount.c +++ b/lustre/obdclass/obd_mount.c @@ -681,7 +681,8 @@ static int lustre_start_mgc(struct super_block *sb) /* We connect to the MGS at setup, and don't disconnect until cleanup */ data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_AT | - OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV; + OBD_CONNECT_FULL20 | OBD_CONNECT_IMP_RECOV | + OBD_CONNECT_LVB_TYPE; #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 50, 0) data->ocd_connect_flags |= OBD_CONNECT_MNE_SWAB; @@ -1056,6 +1057,7 @@ static int lustre_osp_connect(struct obd_device *osp) data->ocd_connect_flags |= OBD_CONNECT_ACL | OBD_CONNECT_IBITS | OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID | OBD_CONNECT_AT | OBD_CONNECT_FULL20 | + OBD_CONNECT_LVB_TYPE | OBD_CONNECT_LIGHTWEIGHT; OBD_ALLOC_PTR(uuid); if (uuid == NULL) diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 044c138..7494d3f 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -576,8 +576,8 @@ static int echo_setup(struct obd_device *obd, struct lustre_cfg *lcfg) rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_PLAIN, NULL, LCK_NL, &lock_flags, NULL, - ldlm_completion_ast, NULL, NULL, 0, NULL, - &obd->u.echo.eo_nl_lock); + ldlm_completion_ast, NULL, NULL, 0, + LVB_T_NONE, NULL, &obd->u.echo.eo_nl_lock); LASSERT (rc == ELDLM_OK); lprocfs_echo_init_vars(&lvars); diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 492ad2d..43619cc 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -3005,7 +3005,7 @@ static int echo_client_setup(const struct lu_env *env, ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL | OBD_CONNECT_GRANT | OBD_CONNECT_FULL20 | - OBD_CONNECT_64BITHASH; + OBD_CONNECT_64BITHASH | OBD_CONNECT_LVB_TYPE; ocd->ocd_version = LUSTRE_VERSION_CODE; ocd->ocd_group = FID_SEQ_ECHO; diff --git a/lustre/ofd/ofd_dlm.c b/lustre/ofd/ofd_dlm.c index c0bbd15..6ed8faf 100644 --- a/lustre/ofd/ofd_dlm.c +++ b/lustre/ofd/ofd_dlm.c @@ -111,6 +111,7 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, CFS_LIST_HEAD(gl_list); ENTRY; + lock->l_lvb_type = LVB_T_OST; policy = ldlm_get_processing_policy(res); LASSERT(policy != NULL); LASSERT(req != NULL); diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index 45b8e3d..7b37b0d 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -295,6 +295,7 @@ struct ofd_thread_info { /* Space used by the I/O, used by grant code */ unsigned long fti_used; + struct ost_lvb fti_lvb; }; extern void target_recovery_fini(struct obd_device *obd); diff --git a/lustre/ofd/ofd_lvb.c b/lustre/ofd/ofd_lvb.c index 2e201cb..61463a8 100644 --- a/lustre/ofd/ofd_lvb.c +++ b/lustre/ofd/ofd_lvb.c @@ -154,13 +154,34 @@ static int ofd_lvbo_update(struct ldlm_resource *res, /* Update the LVB from the network message */ if (req != NULL) { struct ost_lvb *rpc_lvb; - - /* XXX update always from reply buffer */ - rpc_lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); - if (rpc_lvb == NULL) { - CERROR("lustre_swab_buf failed\n"); - goto disk_update; + bool lvb_type; + + if (req->rq_import != NULL) + lvb_type = imp_connect_lvb_type(req->rq_import); + else + lvb_type = exp_connect_lvb_type(req->rq_export); + + if (!lvb_type) { + struct ost_lvb_v1 *lvb_v1; + + lvb_v1 = req_capsule_server_swab_get(&req->rq_pill, + &RMF_DLM_LVB, lustre_swab_ost_lvb_v1); + if (lvb_v1 == NULL) + goto disk_update; + + rpc_lvb = &info->fti_lvb; + memcpy(rpc_lvb, lvb_v1, sizeof *lvb_v1); + rpc_lvb->lvb_mtime_ns = 0; + rpc_lvb->lvb_atime_ns = 0; + rpc_lvb->lvb_ctime_ns = 0; + } else { + rpc_lvb = req_capsule_server_swab_get(&req->rq_pill, + &RMF_DLM_LVB, + lustre_swab_ost_lvb); + if (rpc_lvb == NULL) + goto disk_update; } + lock_res(res); if (rpc_lvb->lvb_size > lvb->lvb_size || !increase_only) { CDEBUG(D_DLMTRACE, "res: "LPU64" updating lvb size: " @@ -249,22 +270,24 @@ out_unlock: return rc; } -static int ofd_lvbo_size(struct ldlm_lock *unused) +static int ofd_lvbo_size(struct ldlm_lock *lock) { - return sizeof(struct ost_lvb); + if (lock->l_export != NULL && exp_connect_lvb_type(lock->l_export)) + return sizeof(struct ost_lvb); + else + return sizeof(struct ost_lvb_v1); } static int ofd_lvbo_fill(struct ldlm_lock *lock, void *buf, int buflen) { struct ldlm_resource *res = lock->l_resource; + int lvb_len = min_t(int, res->lr_lvb_len, buflen); lock_res(res); - LASSERTF(buflen >= res->lr_lvb_len, - "actual %d, want %d\n", buflen, res->lr_lvb_len); - memcpy(buf, res->lr_lvb_data, res->lr_lvb_len); + memcpy(buf, res->lr_lvb_data, lvb_len); unlock_res(res); - return res->lr_lvb_len; + return lvb_len; } struct ldlm_valblock_ops ofd_lvbo = { diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index 5242519..2f255fe 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -961,7 +961,7 @@ static int ofd_destroy_by_fid(const struct lu_env *env, rc = ldlm_cli_enqueue_local(ofd->ofd_namespace, &info->fti_resid, LDLM_EXTENT, &policy, LCK_PW, &flags, ldlm_blocking_ast, ldlm_completion_ast, - NULL, NULL, 0, NULL, &lockh); + NULL, NULL, 0, LVB_T_NONE, NULL, &lockh); /* We only care about the side-effects, just drop the lock. */ if (rc == ELDLM_OK) diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index b4b5e5a..1772093 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -888,6 +888,11 @@ static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) obj = lock->cll_descr.cld_obj; result = cl_object_glimpse(env, obj, lvb); } + if (!exp_connect_lvb_type(req->rq_export)) + req_capsule_shrink(&req->rq_pill, + &RMF_DLM_LVB, + sizeof(struct ost_lvb_v1), + RCL_SERVER); osc_ast_data_put(env, olck); } else { /* diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 7b84e6a..357b893 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -2576,7 +2576,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, *flags &= ~LDLM_FL_BLOCK_GRANTED; rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb, - sizeof(*lvb), lockh, async); + sizeof(*lvb), LVB_T_OST, lockh, async); if (rqset) { if (!rc) { struct osc_enqueue_args *aa; diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 823e77b..98dc914 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -684,7 +684,8 @@ static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp, OBD_CONNECT_REQPORTAL | OBD_CONNECT_SKIP_ORPHAN | OBD_CONNECT_VERSION | - OBD_CONNECT_FID; + OBD_CONNECT_FID | + OBD_CONNECT_LVB_TYPE; if (is_osp_on_ost(osp->opd_obd->obd_name)) ocd->ocd_connect_flags |= OBD_CONNECT_LIGHTWEIGHT; diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index b560279..3d789d9 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -245,7 +245,8 @@ static int ost_lock_get(struct obd_export *exp, struct obdo *oa, RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, - ldlm_glimpse_ast, NULL, 0, NULL, lh)); + ldlm_glimpse_ast, NULL, 0, LVB_T_NONE, + NULL, lh)); } /* Helper function: release lock, if any. */ @@ -642,7 +643,8 @@ static int ost_brw_lock_get(int mode, struct obd_export *exp, RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, - ldlm_glimpse_ast, NULL, 0, NULL, lh)); + ldlm_glimpse_ast, NULL, 0, LVB_T_NONE, + NULL, lh)); } static void ost_brw_lock_put(int mode, diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 20bde6c..11796ae 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -892,8 +892,7 @@ struct req_msg_field RMF_LDLM_INTENT = EXPORT_SYMBOL(RMF_LDLM_INTENT); struct req_msg_field RMF_DLM_LVB = - DEFINE_MSGF("dlm_lvb", 0, sizeof(union ldlm_wire_lvb), lustre_swab_lvb, - NULL); + DEFINE_MSGF("dlm_lvb", 0, -1, NULL, NULL); EXPORT_SYMBOL(RMF_DLM_LVB); struct req_msg_field RMF_DLM_GL_DESC = @@ -1815,8 +1814,8 @@ EXPORT_SYMBOL(req_capsule_client_get); * unused too. */ void *req_capsule_client_swab_get(struct req_capsule *pill, - const struct req_msg_field *field, - void (*swabber)(void* )) + const struct req_msg_field *field, + void *swabber) { return __req_capsule_get(pill, field, RCL_CLIENT, swabber, 0); } @@ -1879,6 +1878,15 @@ void *req_capsule_server_sized_get(struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_server_sized_get); +void *req_capsule_server_sized_swab_get(struct req_capsule *pill, + const struct req_msg_field *field, + int len, void *swabber) +{ + req_capsule_set_size(pill, field, RCL_SERVER, len); + return __req_capsule_get(pill, field, RCL_SERVER, swabber, 0); +} +EXPORT_SYMBOL(req_capsule_server_sized_swab_get); + /** * Returns the buffer of a \a pill corresponding to the given \a field from the * request (if the caller is executing on the server-side) or reply (if the diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 9ac472d..e94bef0 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1833,20 +1833,39 @@ void lustre_swab_gl_desc(union ldlm_gl_desc *desc) CLASSERT(offsetof(typeof(desc->lquota_desc), gl_pad2) != 0); } -void lustre_swab_lvb(union ldlm_wire_lvb *lvb) +void lustre_swab_ost_lvb_v1(struct ost_lvb_v1 *lvb) { - /* The ldlm_wire_lvb union represents all the possible LVB types. - * Unfortunately, there is no way to know what member of the union we - * are dealing with at this point. Therefore, all LVB structures must - * have fields of the same types, although used for different purposes - */ - __swab64s(&lvb->l_ost.lvb_size); - __swab64s(&lvb->l_ost.lvb_mtime); - __swab64s(&lvb->l_ost.lvb_atime); - __swab64s(&lvb->l_ost.lvb_ctime); - __swab64s(&lvb->l_ost.lvb_blocks); + __swab64s(&lvb->lvb_size); + __swab64s(&lvb->lvb_mtime); + __swab64s(&lvb->lvb_atime); + __swab64s(&lvb->lvb_ctime); + __swab64s(&lvb->lvb_blocks); +} +EXPORT_SYMBOL(lustre_swab_ost_lvb_v1); + +void lustre_swab_ost_lvb(struct ost_lvb *lvb) +{ + __swab64s(&lvb->lvb_size); + __swab64s(&lvb->lvb_mtime); + __swab64s(&lvb->lvb_atime); + __swab64s(&lvb->lvb_ctime); + __swab64s(&lvb->lvb_blocks); + __swab32s(&lvb->lvb_mtime_ns); + __swab32s(&lvb->lvb_atime_ns); + __swab32s(&lvb->lvb_ctime_ns); + __swab32s(&lvb->lvb_padding); +} +EXPORT_SYMBOL(lustre_swab_ost_lvb); + +void lustre_swab_lquota_lvb(struct lquota_lvb *lvb) +{ + __swab64s(&lvb->lvb_flags); + __swab64s(&lvb->lvb_id_may_rel); + __swab64s(&lvb->lvb_id_rel); + __swab64s(&lvb->lvb_id_qunit); + __swab64s(&lvb->lvb_pad1); } -EXPORT_SYMBOL(lustre_swab_lvb); +EXPORT_SYMBOL(lustre_swab_lquota_lvb); void lustre_swab_mdt_body (struct mdt_body *b) { diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 8006b25..fdfaea6 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -54,8 +54,8 @@ void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' * (make -C lustre/utils newwiretest) - * running on Linux chopin 2.6.32.wc #9 SMP Thu Feb 9 14:43:41 CST 2012 x86_64 x86_64 x86_64 - * with gcc version 4.4.4 20100726 (Red Hat 4.4.4-13) (GCC) */ + * running on Linux mercury 2.6.32-279.5.1.el6_lustre.x86_64 #1 SMP Tue Aug 21 00:00:41 PDT 2 + * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC) */ /* Constants... */ @@ -3049,8 +3049,32 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_policy_res2) == 8, "found %lld\n", (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_policy_res2)); + /* Checks for struct ost_lvb_v1 */ + LASSERTF((int)sizeof(struct ost_lvb_v1) == 40, "found %lld\n", + (long long)(int)sizeof(struct ost_lvb_v1)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_size) == 0, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_size)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_size) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_size)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_mtime) == 8, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_mtime)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_mtime) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_mtime)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_atime) == 16, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_atime)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_atime) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_atime)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_ctime) == 24, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_ctime)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_ctime) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_ctime)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_blocks) == 32, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_blocks)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_blocks) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_blocks)); + /* Checks for struct ost_lvb */ - LASSERTF((int)sizeof(struct ost_lvb) == 40, "found %lld\n", + LASSERTF((int)sizeof(struct ost_lvb) == 56, "found %lld\n", (long long)(int)sizeof(struct ost_lvb)); LASSERTF((int)offsetof(struct ost_lvb, lvb_size) == 0, "found %lld\n", (long long)(int)offsetof(struct ost_lvb, lvb_size)); @@ -3072,6 +3096,22 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ost_lvb, lvb_blocks)); LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_blocks) == 8, "found %lld\n", (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_blocks)); + LASSERTF((int)offsetof(struct ost_lvb, lvb_mtime_ns) == 40, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb, lvb_mtime_ns)); + LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_mtime_ns) == 4, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_mtime_ns)); + LASSERTF((int)offsetof(struct ost_lvb, lvb_atime_ns) == 44, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb, lvb_atime_ns)); + LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_atime_ns) == 4, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_atime_ns)); + LASSERTF((int)offsetof(struct ost_lvb, lvb_ctime_ns) == 48, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb, lvb_ctime_ns)); + LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_ctime_ns) == 4, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_ctime_ns)); + LASSERTF((int)offsetof(struct ost_lvb, lvb_padding) == 52, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb, lvb_padding)); + LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_padding) == 4, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_padding)); /* Checks for struct lquota_lvb */ LASSERTF((int)sizeof(struct lquota_lvb) == 40, "found %lld\n", diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index 8b7abb6..70ab3b4 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -56,6 +56,8 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, ENTRY; req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA); + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + ldlm_lvbo_size(*lockp)); /* extract quota body and intent opc */ it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT); @@ -132,8 +134,6 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld, } /* on success, pack lvb in reply */ - req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, - ldlm_lvbo_size(*lockp)); lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp)); EXIT; @@ -252,7 +252,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, /* no need to update lvb for global quota locks */ RETURN(0); - lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB); + lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB, + lustre_swab_lquota_lvb); if (lvb == NULL) { CERROR("%s: failed to extract lvb from request\n", qmt->qmt_svname); diff --git a/lustre/quota/qsd_handler.c b/lustre/quota/qsd_handler.c index 3b87ddb..ef12ddb 100644 --- a/lustre/quota/qsd_handler.c +++ b/lustre/quota/qsd_handler.c @@ -308,7 +308,7 @@ static void qsd_req_completion(const struct lu_env *env, struct quota_body *reqbody, struct quota_body *repbody, struct lustre_handle *lockh, - union ldlm_wire_lvb *lvb, + struct lquota_lvb *lvb, void *arg, int ret) { struct lquota_entry *lqe = (struct lquota_entry *)arg; @@ -390,9 +390,9 @@ static void qsd_req_completion(const struct lu_env *env, /* extract information from lvb */ if (ret == 0 && lvb != 0) { - if (lvb->l_lquota.lvb_id_qunit != 0) - qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit); - if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT) + if (lvb->lvb_id_qunit != 0) + qsd_set_qunit(lqe, lvb->lvb_id_qunit); + if (lvb->lvb_flags & LQUOTA_FL_EDQUOT) lqe->lqe_edquot = true; else lqe->lqe_edquot = false; @@ -590,7 +590,7 @@ static int qsd_acquire_remote(const struct lu_env *env, /* check whether we already own a valid lock for this ID */ rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh); if (rc) { - union ldlm_wire_lvb *lvb; + struct lquota_lvb *lvb; OBD_ALLOC_PTR(lvb); if (lvb == NULL) { @@ -982,7 +982,7 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe) qsd_req_completion, qqi, &qti->qti_lockh, lqe); } else { - union ldlm_wire_lvb *lvb; + struct lquota_lvb *lvb; OBD_ALLOC_PTR(lvb); if (lvb == NULL) diff --git a/lustre/quota/qsd_internal.h b/lustre/quota/qsd_internal.h index 3f9f1e48..f76a6ce 100644 --- a/lustre/quota/qsd_internal.h +++ b/lustre/quota/qsd_internal.h @@ -245,7 +245,7 @@ struct qsd_thread_info { struct ldlm_enqueue_info qti_einfo; struct lustre_handle qti_lockh; __u64 qti_slv_ver; - union ldlm_wire_lvb qti_lvb; + struct lquota_lvb qti_lvb; union { struct quota_body qti_body; struct idx_info qti_ii; @@ -351,14 +351,14 @@ typedef void (*qsd_req_completion_t) (const struct lu_env *, struct qsd_qtype_info *, struct quota_body *, struct quota_body *, struct lustre_handle *, - union ldlm_wire_lvb *, void *, int); + struct lquota_lvb *, void *, int); int qsd_send_dqacq(const struct lu_env *, struct obd_export *, struct quota_body *, bool, qsd_req_completion_t, struct qsd_qtype_info *, struct lustre_handle *, struct lquota_entry *); int qsd_intent_lock(const struct lu_env *, struct obd_export *, struct quota_body *, bool, int, qsd_req_completion_t, - struct qsd_qtype_info *, union ldlm_wire_lvb *, void *); + struct qsd_qtype_info *, struct lquota_lvb *, void *); int qsd_fetch_index(const struct lu_env *, struct obd_export *, struct idx_info *, unsigned int, cfs_page_t **, bool *); diff --git a/lustre/quota/qsd_lock.c b/lustre/quota/qsd_lock.c index a21505d..68c8352 100644 --- a/lustre/quota/qsd_lock.c +++ b/lustre/quota/qsd_lock.c @@ -126,6 +126,8 @@ static int qsd_common_glimpse_ast(struct ptlrpc_request *req, RETURN(-EFAULT); /* prepare reply */ + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(struct lquota_lvb)); rc = req_capsule_server_pack(&req->rq_pill); if (rc != 0) { CERROR("Can't pack response, rc %d\n", rc); diff --git a/lustre/quota/qsd_reint.c b/lustre/quota/qsd_reint.c index c558bc9..aa317fb 100644 --- a/lustre/quota/qsd_reint.c +++ b/lustre/quota/qsd_reint.c @@ -44,7 +44,7 @@ static void qsd_reint_completion(const struct lu_env *env, struct quota_body *req_qbody, struct quota_body *rep_qbody, struct lustre_handle *lockh, - union ldlm_wire_lvb *lvb, + struct lquota_lvb *lvb, void *arg, int rc) { struct qsd_instance *qsd = qqi->qqi_qsd; @@ -62,7 +62,7 @@ static void qsd_reint_completion(const struct lu_env *env, CDEBUG(D_QUOTA, "%s: global quota lock successfully acquired, glb " "fid:"DFID", glb ver:"LPU64", slv fid:"DFID", slv ver:"LPU64"\n", qsd->qsd_svname, PFID(&req_qbody->qb_fid), - lvb->l_lquota.lvb_glb_ver, PFID(&rep_qbody->qb_slv_fid), + lvb->lvb_glb_ver, PFID(&rep_qbody->qb_slv_fid), rep_qbody->qb_slv_ver); *slv_ver = rep_qbody->qb_slv_ver; @@ -457,7 +457,7 @@ static int qsd_reint_main(void *args) ldlm_lock_addref_try(&qqi->qqi_lockh, qsd_glb_einfo.ei_mode) == 0) { read_unlock(&qsd->qsd_lock); /* force refresh of global & slave index copy */ - qti->qti_lvb.l_lquota.lvb_glb_ver = ~0ULL; + qti->qti_lvb.lvb_glb_ver = ~0ULL; qti->qti_slv_ver = ~0ULL; } else { /* no valid lock found, let's enqueue a new one */ @@ -475,7 +475,7 @@ static int qsd_reint_main(void *args) CDEBUG(D_QUOTA, "%s: glb_ver:"LPU64"/"LPU64",slv_ver:"LPU64"/" LPU64"\n", qsd->qsd_svname, - qti->qti_lvb.l_lquota.lvb_glb_ver, qqi->qqi_glb_ver, + qti->qti_lvb.lvb_glb_ver, qqi->qqi_glb_ver, qti->qti_slv_ver, qqi->qqi_slv_ver); } @@ -485,7 +485,7 @@ static int qsd_reint_main(void *args) OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REINT, 10); - if (qqi->qqi_glb_ver != qti->qti_lvb.l_lquota.lvb_glb_ver) { + if (qqi->qqi_glb_ver != qti->qti_lvb.lvb_glb_ver) { rc = qsd_reint_index(env, qqi, true); if (rc) { CWARN("%s: reint global for "DFID" failed. %d\n", diff --git a/lustre/quota/qsd_request.c b/lustre/quota/qsd_request.c index c5e61e4..50bf8db 100644 --- a/lustre/quota/qsd_request.c +++ b/lustre/quota/qsd_request.c @@ -45,7 +45,7 @@ struct qsd_async_args { struct obd_export *aa_exp; struct qsd_qtype_info *aa_qqi; void *aa_arg; - union ldlm_wire_lvb *aa_lvb; + struct lquota_lvb *aa_lvb; struct lustre_handle aa_lockh; qsd_req_completion_t aa_completion; }; @@ -173,7 +173,7 @@ static int qsd_intent_interpret(const struct lu_env *env, rc = ldlm_cli_enqueue_fini(aa->aa_exp, req, LDLM_PLAIN, 0, LCK_CR, &flags, (void *)aa->aa_lvb, - sizeof(union ldlm_wire_lvb), lockh, rc); + sizeof(struct lquota_lvb), lockh, rc); if (rc < 0) /* the lock has been destroyed, forget about the lock handle */ memset(lockh, 0, sizeof(*lockh)); @@ -208,7 +208,7 @@ static int qsd_intent_interpret(const struct lu_env *env, int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, struct quota_body *qbody, bool sync, int it_op, qsd_req_completion_t completion, struct qsd_qtype_info *qqi, - union ldlm_wire_lvb *lvb, void *arg) + struct lquota_lvb *lvb, void *arg) { struct qsd_thread_info *qti = qsd_info(env); struct ptlrpc_request *req; @@ -242,6 +242,8 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, req_qbody = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_BODY); *req_qbody = *qbody; + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, + sizeof(*lvb)); ptlrpc_request_set_replen(req); switch(it_op) { @@ -271,8 +273,8 @@ int qsd_intent_lock(const struct lu_env *env, struct obd_export *exp, /* build lock enqueue request */ rc = ldlm_cli_enqueue(exp, &req, &qti->qti_einfo, &qti->qti_resid, NULL, - &flags, (void *)lvb, sizeof(*lvb), &qti->qti_lockh, - 1); + &flags, (void *)lvb, sizeof(*lvb), LVB_T_LQUOTA, + &qti->qti_lockh, 1); if (rc < 0) { ptlrpc_req_finished(req); GOTO(out, rc); diff --git a/lustre/utils/req-layout.c b/lustre/utils/req-layout.c index 8c4c74d..46910b7 100644 --- a/lustre/utils/req-layout.c +++ b/lustre/utils/req-layout.c @@ -76,7 +76,8 @@ #define lustre_swab_idx_info NULL #define lustre_swab_qdata NULL #define lustre_swab_quota_body NULL -#define lustre_swab_lvb NULL +#define lustre_swab_ost_lvb_v1 NULL +#define lustre_swab_ost_lvb NULL #define lustre_swab_gl_desc NULL #define lustre_swab_mgs_config_body NULL #define lustre_swab_mgs_config_res NULL diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index b114274..c745497 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -497,9 +497,9 @@ check_obd_connect_data(void) CHECK_DEFINE_64X(OBD_CONNECT_EINPROGRESS); CHECK_DEFINE_64X(OBD_CONNECT_GRANT_PARAM); CHECK_DEFINE_64X(OBD_CONNECT_FLOCK_OWNER); - CHECK_DEFINE_64X(OBD_CONNECT_LVB_TYPE); - CHECK_DEFINE_64X(OBD_CONNECT_NANOSEC_TIME); - CHECK_DEFINE_64X(OBD_CONNECT_LIGHTWEIGHT); + CHECK_DEFINE_64X(OBD_CONNECT_LVB_TYPE); + CHECK_DEFINE_64X(OBD_CONNECT_NANOSEC_TIME); + CHECK_DEFINE_64X(OBD_CONNECT_LIGHTWEIGHT); CHECK_DEFINE_64X(OBD_CONNECT_SHORTIO); CHECK_VALUE_X(OBD_CKSUM_CRC32); @@ -1308,15 +1308,31 @@ check_ldlm_reply(void) } static void +check_ldlm_ost_lvb_v1(void) +{ + BLANK_LINE(); + CHECK_STRUCT(ost_lvb_v1); + CHECK_MEMBER(ost_lvb_v1, lvb_size); + CHECK_MEMBER(ost_lvb_v1, lvb_mtime); + CHECK_MEMBER(ost_lvb_v1, lvb_atime); + CHECK_MEMBER(ost_lvb_v1, lvb_ctime); + CHECK_MEMBER(ost_lvb_v1, lvb_blocks); +} + +static void check_ldlm_ost_lvb(void) { - BLANK_LINE(); - CHECK_STRUCT(ost_lvb); - CHECK_MEMBER(ost_lvb, lvb_size); - CHECK_MEMBER(ost_lvb, lvb_mtime); - CHECK_MEMBER(ost_lvb, lvb_atime); - CHECK_MEMBER(ost_lvb, lvb_ctime); - CHECK_MEMBER(ost_lvb, lvb_blocks); + BLANK_LINE(); + CHECK_STRUCT(ost_lvb); + CHECK_MEMBER(ost_lvb, lvb_size); + CHECK_MEMBER(ost_lvb, lvb_mtime); + CHECK_MEMBER(ost_lvb, lvb_atime); + CHECK_MEMBER(ost_lvb, lvb_ctime); + CHECK_MEMBER(ost_lvb, lvb_blocks); + CHECK_MEMBER(ost_lvb, lvb_mtime_ns); + CHECK_MEMBER(ost_lvb, lvb_atime_ns); + CHECK_MEMBER(ost_lvb, lvb_ctime_ns); + CHECK_MEMBER(ost_lvb, lvb_padding); } static void @@ -2148,6 +2164,7 @@ main(int argc, char **argv) check_ldlm_lock_desc(); check_ldlm_request(); check_ldlm_reply(); + check_ldlm_ost_lvb_v1(); check_ldlm_ost_lvb(); check_ldlm_lquota_lvb(); check_ldlm_gl_lquota_desc(); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index a51bcc6..e1df03b 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -62,8 +62,8 @@ void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' * (make -C lustre/utils newwiretest) - * running on Linux chopin 2.6.32.wc #9 SMP Thu Feb 9 14:43:41 CST 2012 x86_64 x86_64 x86_64 - * with gcc version 4.4.4 20100726 (Red Hat 4.4.4-13) (GCC) */ + * running on Linux mercury 2.6.32-279.5.1.el6_lustre.x86_64 #1 SMP Tue Aug 21 00:00:41 PDT 2 + * with gcc version 4.4.6 20120305 (Red Hat 4.4.6-4) (GCC) */ /* Constants... */ @@ -3057,8 +3057,32 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_policy_res2) == 8, "found %lld\n", (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_policy_res2)); + /* Checks for struct ost_lvb_v1 */ + LASSERTF((int)sizeof(struct ost_lvb_v1) == 40, "found %lld\n", + (long long)(int)sizeof(struct ost_lvb_v1)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_size) == 0, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_size)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_size) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_size)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_mtime) == 8, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_mtime)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_mtime) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_mtime)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_atime) == 16, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_atime)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_atime) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_atime)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_ctime) == 24, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_ctime)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_ctime) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_ctime)); + LASSERTF((int)offsetof(struct ost_lvb_v1, lvb_blocks) == 32, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb_v1, lvb_blocks)); + LASSERTF((int)sizeof(((struct ost_lvb_v1 *)0)->lvb_blocks) == 8, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb_v1 *)0)->lvb_blocks)); + /* Checks for struct ost_lvb */ - LASSERTF((int)sizeof(struct ost_lvb) == 40, "found %lld\n", + LASSERTF((int)sizeof(struct ost_lvb) == 56, "found %lld\n", (long long)(int)sizeof(struct ost_lvb)); LASSERTF((int)offsetof(struct ost_lvb, lvb_size) == 0, "found %lld\n", (long long)(int)offsetof(struct ost_lvb, lvb_size)); @@ -3080,6 +3104,22 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ost_lvb, lvb_blocks)); LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_blocks) == 8, "found %lld\n", (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_blocks)); + LASSERTF((int)offsetof(struct ost_lvb, lvb_mtime_ns) == 40, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb, lvb_mtime_ns)); + LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_mtime_ns) == 4, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_mtime_ns)); + LASSERTF((int)offsetof(struct ost_lvb, lvb_atime_ns) == 44, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb, lvb_atime_ns)); + LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_atime_ns) == 4, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_atime_ns)); + LASSERTF((int)offsetof(struct ost_lvb, lvb_ctime_ns) == 48, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb, lvb_ctime_ns)); + LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_ctime_ns) == 4, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_ctime_ns)); + LASSERTF((int)offsetof(struct ost_lvb, lvb_padding) == 52, "found %lld\n", + (long long)(int)offsetof(struct ost_lvb, lvb_padding)); + LASSERTF((int)sizeof(((struct ost_lvb *)0)->lvb_padding) == 4, "found %lld\n", + (long long)(int)sizeof(((struct ost_lvb *)0)->lvb_padding)); /* Checks for struct lquota_lvb */ LASSERTF((int)sizeof(struct lquota_lvb) == 40, "found %lld\n", -- 1.8.3.1