From: tappro Date: Wed, 18 Mar 2009 09:11:49 +0000 (+0000) Subject: - land b_hd_ver_recov X-Git-Tag: v1_9_166~18 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=033480704626652b36490a50a6359e74c1490690 - land b_hd_ver_recov --- diff --git a/lustre/cmm/cmm_device.c b/lustre/cmm/cmm_device.c index 8245d2f..82c89dc 100644 --- a/lustre/cmm/cmm_device.c +++ b/lustre/cmm/cmm_device.c @@ -57,7 +57,7 @@ # include #endif -static struct obd_ops cmm_obd_device_ops = { +struct obd_ops cmm_obd_device_ops = { .o_owner = THIS_MODULE }; diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index fa52599..281dc0c 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -362,6 +362,18 @@ static int cml_object_sync(const struct lu_env *env, struct md_object *mo) RETURN(rc); } +static dt_obj_version_t cml_version_get(const struct lu_env *env, + struct md_object *mo) +{ + return mo_version_get(env, md_object_next(mo)); +} + +static void cml_version_set(const struct lu_env *env, struct md_object *mo, + dt_obj_version_t version) +{ + return mo_version_set(env, md_object_next(mo), version); +} + static const struct md_object_operations cml_mo_ops = { .moo_permission = cml_permission, .moo_attr_get = cml_attr_get, @@ -379,6 +391,8 @@ static const struct md_object_operations cml_mo_ops = { .moo_readlink = cml_readlink, .moo_capa_get = cml_capa_get, .moo_object_sync = cml_object_sync, + .moo_version_get = cml_version_get, + .moo_version_set = cml_version_set, .moo_path = cml_path, }; @@ -953,6 +967,18 @@ static int cmr_object_sync(const struct lu_env *env, struct md_object *mo) return -EFAULT; } +static dt_obj_version_t cmr_version_get(const struct lu_env *env, + struct md_object *mo) +{ + LBUG(); +} + +static void cmr_version_set(const struct lu_env *env, struct md_object *mo, + dt_obj_version_t version) +{ + LBUG(); +} + static const struct md_object_operations cmr_mo_ops = { .moo_permission = cmr_permission, .moo_attr_get = cmr_attr_get, @@ -970,6 +996,8 @@ static const struct md_object_operations cmr_mo_ops = { .moo_readlink = cmr_readlink, .moo_capa_get = cmr_capa_get, .moo_object_sync = cmr_object_sync, + .moo_version_get = cmr_version_get, + .moo_version_set = cmr_version_set, .moo_path = cmr_path, }; diff --git a/lustre/include/Makefile.am b/lustre/include/Makefile.am index d18e1a9..7496e1d 100644 --- a/lustre/include/Makefile.am +++ b/lustre/include/Makefile.am @@ -46,4 +46,4 @@ EXTRA_DIST = ioctl.h liblustre.h lprocfs_status.h lustre_cfg.h \ md_object.h dt_object.h lustre_param.h lustre_mdt.h \ lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \ lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \ - lu_ref.h cl_object.h lustre_acl.h lclient.h + lu_ref.h cl_object.h lustre_acl.h lclient.h lu_target.h diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 4d33e83..160f47a 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -242,6 +242,9 @@ struct dt_object_format { enum dt_format_type dt_mode_to_dft(__u32 mode); +/** Version type. May differ in DMU and ldiskfs */ +typedef __u64 dt_obj_version_t; + /** * Per-dt-object operations. */ @@ -371,6 +374,10 @@ struct dt_object_operations { struct lustre_capa *old, __u64 opc); int (*do_object_sync)(const struct lu_env *, struct dt_object *); + dt_obj_version_t (*do_version_get)(const struct lu_env *env, + struct dt_object *dt); + void (*do_version_set)(const struct lu_env *env, struct dt_object *dt, + dt_obj_version_t new_version); /** * Get object info of next level. Currently, only get inode from osd. * This is only used by quota b=16542 @@ -572,6 +579,7 @@ struct dt_txn_callback { int (*dtc_txn_commit)(const struct lu_env *env, struct thandle *txn, void *cookie); void *dtc_cookie; + __u32 dtc_tag; struct list_head dtc_linkage; }; @@ -609,5 +617,40 @@ struct dt_object *dt_locate(const struct lu_env *env, struct dt_device *dev, const struct lu_fid *fid); +static inline dt_obj_version_t do_version_get(const struct lu_env *env, + struct dt_object *o) +{ + LASSERT(o->do_ops->do_version_get); + return o->do_ops->do_version_get(env, o); +} + +static inline void do_version_set(const struct lu_env *env, + struct dt_object *o, dt_obj_version_t v) +{ + LASSERT(o->do_ops->do_version_set); + return o->do_ops->do_version_set(env, o, v); +} + +int dt_record_read(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *buf, loff_t *pos); +int dt_record_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, struct thandle *th); + + +static inline struct thandle *dt_trans_start(const struct lu_env *env, + struct dt_device *d, + struct txn_param *p) +{ + LASSERT(d->dd_ops->dt_trans_start); + return d->dd_ops->dt_trans_start(env, d, p); +} + +static inline void dt_trans_stop(const struct lu_env *env, + struct dt_device *d, + struct thandle *th) +{ + LASSERT(d->dd_ops->dt_trans_stop); + return d->dd_ops->dt_trans_stop(env, th); +} /** @} dt */ #endif /* __LUSTRE_DT_OBJECT_H */ diff --git a/lustre/include/linux/lustre_fsfilt.h b/lustre/include/linux/lustre_fsfilt.h index 484d267..b9e01e6 100644 --- a/lustre/include/linux/lustre_fsfilt.h +++ b/lustre/include/linux/lustre_fsfilt.h @@ -495,7 +495,7 @@ static inline __u64 fsfilt_set_version(struct obd_device *obd, static inline __u64 fsfilt_get_version(struct obd_device *obd, struct inode *inode) { - if (obd->obd_fsops->fs_set_version) + if (obd->obd_fsops->fs_get_version) return obd->obd_fsops->fs_get_version(inode); return -EOPNOTSUPP; } diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h new file mode 100644 index 0000000..aae74e9 --- /dev/null +++ b/lustre/include/lu_target.h @@ -0,0 +1,80 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + */ + +#ifndef _LUSTRE_LU_TARGET_H +#define _LUSTRE_LU_TARGET_H + +#include +#include + +struct lu_target { + struct obd_device *lut_obd; + struct dt_device *lut_bottom; + /** last_rcvd file */ + struct dt_object *lut_last_rcvd; + /* transaction callbacks */ + struct dt_txn_callback lut_txn_cb; + /** server data in last_rcvd file */ + struct lr_server_data lut_lsd; + /** Server last transaction number */ + __u64 lut_last_transno; + /** Lock protecting last transaction number */ + spinlock_t lut_translock; + /** Lock protecting client bitmap */ + spinlock_t lut_client_bitmap_lock; + /** Bitmap of known clients */ + unsigned long lut_client_bitmap[LR_CLIENT_BITMAP_SIZE]; + /** Number of mounts */ + __u64 lut_mount_count; + __u32 lut_stale_export_age; + spinlock_t lut_trans_table_lock; +}; + +typedef void (*lut_cb_t)(struct lu_target *lut, __u64 transno, + void *data, int err); +struct lut_commit_cb { + lut_cb_t lut_cb_func; + void *lut_cb_data; +}; + +void lut_boot_epoch_update(struct lu_target *); +void lut_cb_last_committed(struct lu_target *, __u64, void *, int); +void lut_cb_client(struct lu_target *, __u64, void *, int); +int lut_init(const struct lu_env *, struct lu_target *, + struct obd_device *, struct dt_device *); +void lut_fini(const struct lu_env *, struct lu_target *); + +#endif /* __LUSTRE_LU_TARGET_H */ diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 74623ae..176e21a 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -785,7 +785,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_RMT_CLIENT_FORCE | \ OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \ OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID | \ - LRU_RESIZE_CONNECT_FLAG | \ + LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_VBR | \ OBD_CONNECT_LOV_V3) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ @@ -795,9 +795,9 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_CKSUM | \ OBD_CONNECT_CHANGE_QS | \ OBD_CONNECT_OSS_CAPA | OBD_CONNECT_RMT_CLIENT | \ - OBD_CONNECT_RMT_CLIENT_FORCE | \ + OBD_CONNECT_RMT_CLIENT_FORCE | OBD_CONNECT_VBR | \ OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN | \ - OBD_CONNECT_GRANT_SHRINK) + OBD_CONNECT_GRANT_SHRINK) #define ECHO_CONNECT_SUPPORTED (0) #define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT) diff --git a/lustre/include/lustre_disk.h b/lustre/include/lustre_disk.h index df9fddd..9a81b46 100644 --- a/lustre/include/lustre_disk.h +++ b/lustre/include/lustre_disk.h @@ -192,6 +192,11 @@ struct lustre_mount_data { /****************** last_rcvd file *********************/ +/** version recovery epoch */ +#define LR_EPOCH_BITS 32 +#define lr_epoch(a) ((a) >> LR_EPOCH_BITS) +#define LR_EXPIRE_INTERVALS 16 /**< number of intervals to track transno */ + #define LR_SERVER_SIZE 512 #define LR_CLIENT_START 8192 #define LR_CLIENT_SIZE 128 @@ -211,6 +216,8 @@ struct lustre_mount_data { #define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8) #endif +#define LR_CLIENT_BITMAP_SIZE ((LR_MAX_CLIENTS >> 3) / sizeof(long)) + /** COMPAT_146: this is an OST (temporary) */ #define OBD_COMPAT_OST 0x00000002 /** COMPAT_146: this is an MDT (temporary) */ @@ -254,7 +261,13 @@ struct lr_server_data { __u8 lsd_peeruuid[40]; /* UUID of MDS associated with this OST */ __u32 lsd_ost_index; /* index number of OST in LOV */ __u32 lsd_mdt_index; /* index number of MDT in LMV */ - __u8 lsd_padding[LR_SERVER_SIZE - 148]; + __u32 lsd_start_epoch; /* VBR: start epoch from last boot */ + /** transaction values since lsd_trans_table_time */ + __u64 lsd_trans_table[LR_EXPIRE_INTERVALS]; + /** start point of transno table below */ + __u32 lsd_trans_table_time; /* time of first slot in table above */ + __u32 lsd_expire_intervals; /* LR_EXPIRE_INTERVALS */ + __u8 lsd_padding[LR_SERVER_SIZE - 288]; }; /* Data stored per client in the last_rcvd file. In le32 order. */ @@ -269,9 +282,120 @@ struct lsd_client_data { __u64 lcd_last_close_xid; /* xid for the last transaction */ __u32 lcd_last_close_result; /* result from last RPC */ __u32 lcd_last_close_data; /* per-op data */ - __u8 lcd_padding[LR_CLIENT_SIZE - 88]; + /* VBR: last versions */ + __u64 lcd_pre_versions[4]; + __u32 lcd_last_epoch; + /** orphans handling for delayed export rely on that */ + __u32 lcd_first_epoch; + __u8 lcd_padding[LR_CLIENT_SIZE - 128]; }; +/* last_rcvd handling */ +static inline void lsd_le_to_cpu(struct lr_server_data *buf, + struct lr_server_data *lsd) +{ + int i; + memcpy(lsd->lsd_uuid, buf->lsd_uuid, sizeof (lsd->lsd_uuid)); + lsd->lsd_last_transno = le64_to_cpu(buf->lsd_last_transno); + lsd->lsd_compat14 = le64_to_cpu(buf->lsd_compat14); + lsd->lsd_mount_count = le64_to_cpu(buf->lsd_mount_count); + lsd->lsd_feature_compat = le32_to_cpu(buf->lsd_feature_compat); + lsd->lsd_feature_rocompat = le32_to_cpu(buf->lsd_feature_rocompat); + lsd->lsd_feature_incompat = le32_to_cpu(buf->lsd_feature_incompat); + lsd->lsd_server_size = le32_to_cpu(buf->lsd_server_size); + lsd->lsd_client_start = le32_to_cpu(buf->lsd_client_start); + lsd->lsd_client_size = le16_to_cpu(buf->lsd_client_size); + lsd->lsd_subdir_count = le16_to_cpu(buf->lsd_subdir_count); + lsd->lsd_catalog_oid = le64_to_cpu(buf->lsd_catalog_oid); + lsd->lsd_catalog_ogen = le32_to_cpu(buf->lsd_catalog_ogen); + memcpy(lsd->lsd_peeruuid, buf->lsd_peeruuid, sizeof(lsd->lsd_peeruuid)); + lsd->lsd_ost_index = le32_to_cpu(buf->lsd_ost_index); + lsd->lsd_mdt_index = le32_to_cpu(buf->lsd_mdt_index); + lsd->lsd_start_epoch = le32_to_cpu(buf->lsd_start_epoch); + for (i = 0; i < LR_EXPIRE_INTERVALS; i++) + lsd->lsd_trans_table[i] = le64_to_cpu(buf->lsd_trans_table[i]); + lsd->lsd_trans_table_time = le32_to_cpu(buf->lsd_trans_table_time); + lsd->lsd_expire_intervals = le32_to_cpu(buf->lsd_expire_intervals); +} + +static inline void lsd_cpu_to_le(struct lr_server_data *lsd, + struct lr_server_data *buf) +{ + int i; + memcpy(buf->lsd_uuid, lsd->lsd_uuid, sizeof (buf->lsd_uuid)); + buf->lsd_last_transno = cpu_to_le64(lsd->lsd_last_transno); + buf->lsd_compat14 = cpu_to_le64(lsd->lsd_compat14); + buf->lsd_mount_count = cpu_to_le64(lsd->lsd_mount_count); + buf->lsd_feature_compat = cpu_to_le32(lsd->lsd_feature_compat); + buf->lsd_feature_rocompat = cpu_to_le32(lsd->lsd_feature_rocompat); + buf->lsd_feature_incompat = cpu_to_le32(lsd->lsd_feature_incompat); + buf->lsd_server_size = cpu_to_le32(lsd->lsd_server_size); + buf->lsd_client_start = cpu_to_le32(lsd->lsd_client_start); + buf->lsd_client_size = cpu_to_le16(lsd->lsd_client_size); + buf->lsd_subdir_count = cpu_to_le16(lsd->lsd_subdir_count); + buf->lsd_catalog_oid = cpu_to_le64(lsd->lsd_catalog_oid); + buf->lsd_catalog_ogen = cpu_to_le32(lsd->lsd_catalog_ogen); + memcpy(buf->lsd_peeruuid, lsd->lsd_peeruuid, sizeof(buf->lsd_peeruuid)); + buf->lsd_ost_index = cpu_to_le32(lsd->lsd_ost_index); + buf->lsd_mdt_index = cpu_to_le32(lsd->lsd_mdt_index); + buf->lsd_start_epoch = cpu_to_le32(lsd->lsd_start_epoch); + for (i = 0; i < LR_EXPIRE_INTERVALS; i++) + buf->lsd_trans_table[i] = cpu_to_le64(lsd->lsd_trans_table[i]); + buf->lsd_trans_table_time = cpu_to_le32(lsd->lsd_trans_table_time); + buf->lsd_expire_intervals = cpu_to_le32(lsd->lsd_expire_intervals); +} + +static inline void lcd_le_to_cpu(struct lsd_client_data *buf, + struct lsd_client_data *lcd) +{ + memcpy(lcd->lcd_uuid, buf->lcd_uuid, sizeof (lcd->lcd_uuid)); + lcd->lcd_last_transno = le64_to_cpu(buf->lcd_last_transno); + lcd->lcd_last_xid = le64_to_cpu(buf->lcd_last_xid); + lcd->lcd_last_result = le32_to_cpu(buf->lcd_last_result); + lcd->lcd_last_data = le32_to_cpu(buf->lcd_last_data); + lcd->lcd_last_close_transno = le64_to_cpu(buf->lcd_last_close_transno); + lcd->lcd_last_close_xid = le64_to_cpu(buf->lcd_last_close_xid); + lcd->lcd_last_close_result = le32_to_cpu(buf->lcd_last_close_result); + lcd->lcd_last_close_data = le32_to_cpu(buf->lcd_last_close_data); + lcd->lcd_pre_versions[0] = le64_to_cpu(buf->lcd_pre_versions[0]); + lcd->lcd_pre_versions[1] = le64_to_cpu(buf->lcd_pre_versions[1]); + lcd->lcd_pre_versions[2] = le64_to_cpu(buf->lcd_pre_versions[2]); + lcd->lcd_pre_versions[3] = le64_to_cpu(buf->lcd_pre_versions[3]); + lcd->lcd_last_epoch = le32_to_cpu(buf->lcd_last_epoch); + lcd->lcd_first_epoch = le32_to_cpu(buf->lcd_first_epoch); +} + +static inline void lcd_cpu_to_le(struct lsd_client_data *lcd, + struct lsd_client_data *buf) +{ + memcpy(buf->lcd_uuid, lcd->lcd_uuid, sizeof (lcd->lcd_uuid)); + buf->lcd_last_transno = cpu_to_le64(lcd->lcd_last_transno); + buf->lcd_last_xid = cpu_to_le64(lcd->lcd_last_xid); + buf->lcd_last_result = cpu_to_le32(lcd->lcd_last_result); + buf->lcd_last_data = cpu_to_le32(lcd->lcd_last_data); + buf->lcd_last_close_transno = cpu_to_le64(lcd->lcd_last_close_transno); + buf->lcd_last_close_xid = cpu_to_le64(lcd->lcd_last_close_xid); + buf->lcd_last_close_result = cpu_to_le32(lcd->lcd_last_close_result); + buf->lcd_last_close_data = cpu_to_le32(lcd->lcd_last_close_data); + buf->lcd_pre_versions[0] = cpu_to_le64(lcd->lcd_pre_versions[0]); + buf->lcd_pre_versions[1] = cpu_to_le64(lcd->lcd_pre_versions[1]); + buf->lcd_pre_versions[2] = cpu_to_le64(lcd->lcd_pre_versions[2]); + buf->lcd_pre_versions[3] = cpu_to_le64(lcd->lcd_pre_versions[3]); + buf->lcd_last_epoch = cpu_to_le32(lcd->lcd_last_epoch); + buf->lcd_first_epoch = cpu_to_le32(lcd->lcd_first_epoch); +} + +static inline __u64 lcd_last_transno(struct lsd_client_data *lcd) +{ + return (lcd->lcd_last_transno > lcd->lcd_last_close_transno ? + lcd->lcd_last_transno : lcd->lcd_last_close_transno); +} + +static inline __u64 lcd_last_xid(struct lsd_client_data *lcd) +{ + return (lcd->lcd_last_xid > lcd->lcd_last_close_xid ? + lcd->lcd_last_xid : lcd->lcd_last_close_xid); +} /****************** superblock additional info *********************/ #ifdef __KERNEL__ @@ -311,7 +435,6 @@ struct lustre_mount_info { /****************** prototypes *********************/ #ifdef __KERNEL__ -#include /* obd_mount.c */ void lustre_register_client_fill_super(int (*cfs)(struct super_block *sb)); @@ -319,10 +442,6 @@ void lustre_register_kill_super_cb(void (*cfs)(struct super_block *sb)); int lustre_common_put_super(struct super_block *sb); -int lustre_process_log(struct super_block *sb, char *logname, - struct config_llog_instance *cfg); -int lustre_end_log(struct super_block *sb, char *logname, - struct config_llog_instance *cfg); struct lustre_mount_info *server_get_mount(const char *name); struct lustre_mount_info *server_get_mount_2(const char *name); int server_put_mount(const char *name, struct vfsmount *mnt); diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 6ee97d6..33b5248 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -42,24 +42,36 @@ #include #include -/* Data stored per client in the last_rcvd file. In le32 order. */ struct mds_client_data; struct mdt_client_data; struct mds_idmap_table; struct mdt_idmap_table; +struct lu_export_data { + /** Protects led_lcd below */ + struct semaphore led_lcd_lock; + /** Per-client data for each export */ + struct lsd_client_data *led_lcd; + /** Offset of record in last_rcvd file */ + loff_t led_lr_off; + /** Client index in last_rcvd file */ + int led_lr_idx; +}; + struct mdt_export_data { + struct lu_export_data med_led; struct list_head med_open_head; spinlock_t med_open_lock; /* lock med_open_head, mfd_list*/ - struct semaphore med_lcd_lock; - struct lsd_client_data *med_lcd; __u64 med_ibits_known; - loff_t med_lr_off; - int med_lr_idx; struct semaphore med_idmap_sem; struct lustre_idmap_table *med_idmap; }; +#define med_lcd_lock med_led.led_lcd_lock +#define med_lcd med_led.led_lcd +#define med_lr_off med_led.led_lr_off +#define med_lr_idx med_led.led_lr_idx + struct osc_creator { spinlock_t oscc_lock; struct list_head oscc_list; @@ -82,10 +94,8 @@ struct ec_export_data { /* echo client */ /* In-memory access to client data from OST struct */ struct filter_export_data { - spinlock_t fed_lock; /* protects fed_open_head */ - struct lsd_client_data *fed_lcd; - loff_t fed_lr_off; - int fed_lr_idx; + struct lu_export_data fed_led; + spinlock_t fed_lock; /**< protects fed_mod_list */ long fed_dirty; /* in bytes */ long fed_grant; /* in bytes */ struct list_head fed_mod_list; /* files being modified */ @@ -94,6 +104,11 @@ struct filter_export_data { __u32 fed_group; }; +#define fed_lcd_lock fed_led.led_lcd_lock +#define fed_lcd fed_led.led_lcd +#define fed_lr_off fed_led.led_lr_off +#define fed_lr_idx fed_led.led_lr_idx + typedef struct nid_stat_uuid { struct list_head ns_uuid_list; struct obd_uuid ns_uuid; @@ -137,7 +152,10 @@ struct obd_export { lustre_hash_t *exp_lock_hash; /* existing lock hash */ spinlock_t exp_lock_hash_lock; struct list_head exp_outstanding_replies; - time_t exp_last_request_time; + struct list_head exp_uncommitted_replies; + spinlock_t exp_uncommitted_replies_lock; + __u64 exp_last_committed; + cfs_time_t exp_last_request_time; struct list_head exp_req_replay_queue; spinlock_t exp_lock; /* protects flags int below */ /* ^ protects exp_outstanding_replies too */ @@ -147,6 +165,10 @@ struct obd_export { exp_in_recovery:1, exp_disconnected:1, exp_connecting:1, + /** VBR: export missed recovery */ + exp_delayed:1, + /** VBR: failed version checking */ + exp_vbr_failed:1, exp_req_replay_needed:1, exp_lock_replay_needed:1, exp_need_sync:1, @@ -161,16 +183,25 @@ struct obd_export { cfs_time_t exp_flvr_expire[2]; /* seconds */ union { + struct lu_export_data eu_target_data; struct mdt_export_data eu_mdt_data; struct filter_export_data eu_filter_data; struct ec_export_data eu_ec_data; } u; }; +#define exp_target_data u.eu_target_data #define exp_mdt_data u.eu_mdt_data #define exp_filter_data u.eu_filter_data #define exp_ec_data u.eu_ec_data +static inline int exp_expired(struct obd_export *exp, cfs_duration_t age) +{ + LASSERT(exp->exp_delayed); + return cfs_time_before(cfs_time_add(exp->exp_last_request_time, age), + cfs_time_current_sec()); +} + static inline int exp_connect_cancelset(struct obd_export *exp) { LASSERT(exp != NULL); @@ -197,6 +228,13 @@ static inline int client_is_remote(struct obd_export *exp) OBD_CONNECT_RMT_CLIENT); } +static inline int exp_connect_vbr(struct obd_export *exp) +{ + LASSERT(exp != NULL); + LASSERT(exp->exp_connection); + return !!(exp->exp_connect_flags & OBD_CONNECT_VBR); +} + static inline int imp_connect_lru_resize(struct obd_import *imp) { struct obd_connect_data *ocd; diff --git a/lustre/include/lustre_import.h b/lustre/include/lustre_import.h index f3fd466..5f82e8f 100644 --- a/lustre/include/lustre_import.h +++ b/lustre/include/lustre_import.h @@ -164,6 +164,9 @@ struct obd_import { imp_server_timeout:1, /* use 1/2 timeout on MDS' OSCs */ imp_initial_recov:1, /* retry the initial connection */ imp_initial_recov_bk:1, /* turn off init_recov after trying all failover nids */ + imp_delayed_recovery:1, /* VBR: imp in delayed recovery */ + imp_no_lock_replay:1, /* VBR: if gap was found then no lock replays */ + imp_vbr_failed:1, /* recovery by versions was failed */ imp_force_verify:1, /* force an immidiate ping */ imp_pingable:1, /* pingable */ imp_resend_replay:1, /* resend for replay */ diff --git a/lustre/include/lustre_lib.h b/lustre/include/lustre_lib.h index c99450e..bbcdd91 100644 --- a/lustre/include/lustre_lib.h +++ b/lustre/include/lustre_lib.h @@ -62,9 +62,8 @@ void ll_get_random_bytes(void *buf, int size); /* target.c */ struct ptlrpc_request; -struct recovd_data; -struct recovd_obd; struct obd_export; +struct lu_target; #include #include #include @@ -89,8 +88,6 @@ int target_handle_dqacq_callback(struct ptlrpc_request *req); #define OBD_RECOVERY_MAX_TIME (obd_timeout * 18) /* b13079 */ void target_cancel_recovery_timer(struct obd_device *obd); -int target_start_recovery_thread(struct obd_device *obd, - svc_handler_t handler); void target_stop_recovery_thread(struct obd_device *obd); void target_cleanup_recovery(struct obd_device *obd); int target_queue_recovery_request(struct ptlrpc_request *req, diff --git a/lustre/include/lustre_log.h b/lustre/include/lustre_log.h index eddaeef..ad6e6ca 100644 --- a/lustre/include/lustre_log.h +++ b/lustre/include/lustre_log.h @@ -61,7 +61,7 @@ #error Unsupported operating system. #endif -#include +#include #include #include @@ -294,7 +294,7 @@ struct llog_commit_master { /** * The refcount for lcm */ - atomic_t lcm_refcount; + atomic_t lcm_refcount; /** * Thread control structure. Used for control commit thread. */ @@ -313,7 +313,7 @@ struct llog_commit_master { char lcm_name[LCM_NAME_SIZE]; }; -static inline struct llog_commit_master +static inline struct llog_commit_master *lcm_get(struct llog_commit_master *lcm) { LASSERT(atomic_read(&lcm->lcm_refcount) > 0); @@ -321,13 +321,13 @@ static inline struct llog_commit_master return lcm; } -static inline void +static inline void lcm_put(struct llog_commit_master *lcm) { if (!atomic_dec_and_test(&lcm->lcm_refcount)) { return ; } - OBD_FREE_PTR(lcm); + OBD_FREE_PTR(lcm); } struct llog_canceld_ctxt { @@ -675,4 +675,9 @@ static inline int llog_connect(struct llog_ctxt *ctxt, RETURN(rc); } +int lustre_process_log(struct super_block *sb, char *logname, + struct config_llog_instance *cfg); +int lustre_end_log(struct super_block *sb, char *logname, + struct config_llog_instance *cfg); + #endif diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 70c3150..30bf3d8 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -1004,7 +1004,7 @@ struct ptlrpc_service_conf { /* ptlrpc/service.c */ void ptlrpc_save_lock (struct ptlrpc_request *req, struct lustre_handle *lock, int mode, int no_ack); -void ptlrpc_commit_replies (struct obd_device *obd); +void ptlrpc_commit_replies(struct obd_export *exp); void ptlrpc_dispatch_difficult_reply (struct ptlrpc_reply_state *rs); void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs); struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c, @@ -1105,6 +1105,7 @@ void lustre_msg_add_version(struct lustre_msg *msg, int version); __u32 lustre_msg_get_opc(struct lustre_msg *msg); __u64 lustre_msg_get_last_xid(struct lustre_msg *msg); __u64 lustre_msg_get_last_committed(struct lustre_msg *msg); +__u64 *lustre_msg_get_versions(struct lustre_msg *msg); __u64 lustre_msg_get_transno(struct lustre_msg *msg); __u64 lustre_msg_get_slv(struct lustre_msg *msg); __u32 lustre_msg_get_limit(struct lustre_msg *msg); @@ -1123,6 +1124,7 @@ void lustre_msg_set_type(struct lustre_msg *msg, __u32 type); void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc); void lustre_msg_set_last_xid(struct lustre_msg *msg, __u64 last_xid); void lustre_msg_set_last_committed(struct lustre_msg *msg,__u64 last_committed); +void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions); void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno); void lustre_msg_set_status(struct lustre_msg *msg, __u32 status); void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt); @@ -1271,6 +1273,7 @@ int client_import_add_conn(struct obd_import *imp, struct obd_uuid *uuid, int priority); int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid); int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid); +void client_destroy_import(struct obd_import *imp); /* ptlrpc/pinger.c */ enum timeout_event { diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 8832552..0a16b78 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -56,7 +56,7 @@ /* * super-class definitions. */ -#include +#include #include struct md_device; @@ -246,7 +246,10 @@ struct md_object_operations { struct lustre_capa *, int renewal); int (*moo_object_sync)(const struct lu_env *, struct md_object *); - + dt_obj_version_t (*moo_version_get)(const struct lu_env *, + struct md_object *); + void (*moo_version_set)(const struct lu_env *, struct md_object *, + dt_obj_version_t); int (*moo_path)(const struct lu_env *env, struct md_object *obj, char *path, int pathlen, __u64 *recno, int *linkno); }; @@ -704,6 +707,20 @@ static inline int mo_object_sync(const struct lu_env *env, struct md_object *m) return m->mo_ops->moo_object_sync(env, m); } +static inline dt_obj_version_t mo_version_get(const struct lu_env *env, + struct md_object *m) +{ + LASSERT(m->mo_ops->moo_version_get); + return m->mo_ops->moo_version_get(env, m); +} + +static inline void mo_version_set(const struct lu_env *env, + struct md_object *m, dt_obj_version_t ver) +{ + LASSERT(m->mo_ops->moo_version_set); + return m->mo_ops->moo_version_set(env, m, ver); +} + static inline int mdo_lookup(const struct lu_env *env, struct md_object *p, const struct lu_name *lname, diff --git a/lustre/include/obd.h b/lustre/include/obd.h index dc7745e..a894998 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -60,7 +60,7 @@ #define IOC_MDC_MAX_NR 50 #include -#include +#include #include #include #include @@ -247,6 +247,20 @@ struct ost_server_data; /* hold common fields for "target" device */ struct obd_device_target { struct super_block *obt_sb; + /** last_rcvd file */ + struct file *obt_rcvd_filp; + /** server data in last_rcvd file */ + struct lr_server_data *obt_lsd; + /** Lock protecting client bitmap */ + spinlock_t obt_client_bitmap_lock; + /** Bitmap of known clients */ + unsigned long *obt_client_bitmap; + /** Server last transaction number */ + __u64 obt_last_transno; + /** Lock protecting last transaction number */ + spinlock_t obt_translock; + /** Number of mounts */ + __u64 obt_mount_count; atomic_t obt_quotachecking; struct lustre_quota_ctxt obt_qctxt; lustre_quota_version_t obt_qfmt; @@ -288,6 +302,7 @@ struct filter_ext { struct filter_obd { /* NB this field MUST be first */ struct obd_device_target fo_obt; + struct lu_target fo_lut; const char *fo_fstype; struct vfsmount *fo_vfsmnt; @@ -300,12 +315,7 @@ struct filter_obd { spinlock_t fo_objidlock; /* protect fo_lastobjid */ - spinlock_t fo_translock; /* protect fsd_last_transno */ - struct file *fo_rcvd_filp; struct file *fo_health_check_filp; - struct lr_server_data *fo_fsd; - unsigned long *fo_last_rcvd_slots; - __u64 fo_mount_count; unsigned long fo_destroys_in_progress; struct semaphore fo_create_locks[FILTER_SUBDIR_COUNT]; @@ -372,6 +382,12 @@ struct filter_obd { int fo_sec_level; }; +#define fo_translock fo_obt.obt_translock +#define fo_rcvd_filp fo_obt.obt_rcvd_filp +#define fo_fsd fo_obt.obt_lsd +#define fo_last_rcvd_slots fo_obt.obt_client_bitmap +#define fo_mount_count fo_obt.obt_mount_count + struct timeout_item { enum timeout_event ti_event; cfs_time_t ti_timeout; @@ -380,6 +396,7 @@ struct timeout_item { struct list_head ti_obd_list; struct list_head ti_chain; }; + #define OSC_MAX_RIF_DEFAULT 8 #define OSC_MAX_RIF_MAX 256 #define OSC_MAX_DIRTY_DEFAULT (OSC_MAX_RIF_DEFAULT * 4) @@ -516,15 +533,10 @@ struct mds_obd { cfs_dentry_t *mds_fid_de; int mds_max_mdsize; int mds_max_cookiesize; - struct file *mds_rcvd_filp; - spinlock_t mds_transno_lock; - __u64 mds_last_transno; - __u64 mds_mount_count; __u64 mds_io_epoch; unsigned long mds_atime_diff; struct semaphore mds_epoch_sem; struct ll_fid mds_rootfid; - struct lr_server_data *mds_server_data; cfs_dentry_t *mds_pending_dir; cfs_dentry_t *mds_logs_dir; cfs_dentry_t *mds_objects_dir; @@ -548,8 +560,6 @@ struct mds_obd { __u32 mds_lov_objid_lastidx; struct file *mds_health_check_filp; - unsigned long *mds_client_bitmap; -// struct upcall_cache *mds_group_hash; struct lustre_quota_info mds_quota_info; struct semaphore mds_qonoff_sem; @@ -570,6 +580,13 @@ struct mds_obd { struct rw_semaphore mds_notify_lock; }; +#define mds_transno_lock mds_obt.obt_translock +#define mds_rcvd_filp mds_obt.obt_rcvd_filp +#define mds_server_data mds_obt.obt_lsd +#define mds_client_bitmap mds_obt.obt_client_bitmap +#define mds_mount_count mds_obt.obt_mount_count +#define mds_last_transno mds_obt.obt_last_transno + /* lov objid */ extern __u32 mds_max_ost_index; @@ -829,6 +846,8 @@ struct obd_trans_info { /* initial thread handling transaction */ struct ptlrpc_thread * oti_thread; __u32 oti_conn_cnt; + /** VBR: versions */ + __u64 oti_pre_version; struct obd_uuid *oti_ost_uuid; }; @@ -844,7 +863,15 @@ static inline void oti_init(struct obd_trans_info *oti, return; oti->oti_xid = req->rq_xid; + /** VBR: take versions from request */ + if (req->rq_reqmsg != NULL && + lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) { + __u64 *pre_version = lustre_msg_get_versions(req->rq_reqmsg); + oti->oti_pre_version = pre_version ? pre_version[0] : 0; + oti->oti_transno = lustre_msg_get_transno(req->rq_reqmsg); + } + /** called from mds_create_objects */ if (req->rq_repmsg != NULL) oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); oti->oti_thread = req->rq_svc_thread; @@ -990,7 +1017,8 @@ struct obd_device { unsigned long obd_attached:1, /* finished attach */ obd_set_up:1, /* finished setup */ obd_recovering:1, /* there are recoverable clients */ - obd_abort_recovery:1,/* somebody ioctl'ed us to abort */ + obd_abort_recovery:1,/* recovery expired */ + obd_version_recov:1, /* obd uses version checking */ obd_replayable:1, /* recovery is enabled; inform clients */ obd_no_transno:1, /* no committed-transno notification */ obd_no_recov:1, /* fail instead of retry messages */ @@ -1013,6 +1041,7 @@ struct obd_device { atomic_t obd_refcount; cfs_waitq_t obd_refcount_waitq; struct list_head obd_exports; + struct list_head obd_delayed_exports; int obd_num_exports; spinlock_t obd_nid_lock; struct ldlm_namespace *obd_namespace; @@ -1041,13 +1070,12 @@ struct obd_device { int obd_max_recoverable_clients; int obd_connected_clients; int obd_recoverable_clients; + int obd_delayed_clients; spinlock_t obd_processing_task_lock; /* BH lock (timer) */ __u64 obd_next_recovery_transno; int obd_replayed_requests; int obd_requests_queued_for_recovery; cfs_waitq_t obd_next_transno_waitq; - struct list_head obd_uncommitted_replies; - spinlock_t obd_uncommitted_replies_lock; cfs_timer_t obd_recovery_timer; time_t obd_recovery_start; /* seconds */ time_t obd_recovery_end; /* seconds, for lprocfs_status */ @@ -1571,22 +1599,24 @@ int lvfs_check_io_health(struct obd_device *obd, struct file *file); #define OBD_CALC_STRIPE_END 2 static inline void obd_transno_commit_cb(struct obd_device *obd, __u64 transno, - int error) + struct obd_export *exp, int error) { if (error) { CERROR("%s: transno "LPU64" commit error: %d\n", obd->obd_name, transno, error); return; } - if (transno > obd->obd_last_committed) { - CDEBUG(D_INFO, "%s: transno "LPD64" committed\n", + if (exp && transno > exp->exp_last_committed) { + CDEBUG(D_HA, "%s: transno "LPU64" committed\n", obd->obd_name, transno); - obd->obd_last_committed = transno; - ptlrpc_commit_replies (obd); + exp->exp_last_committed = transno; + ptlrpc_commit_replies(exp); } else { - CDEBUG(D_INFO, "%s: transno "LPD64" committed\n", + CDEBUG(D_INFO, "%s: transno "LPU64" committed\n", obd->obd_name, transno); } + if (transno > obd->obd_last_committed) + obd->obd_last_committed = transno; } static inline void init_obd_quota_ops(quota_interface_t *interface, diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 7096eaa..75bc6d2 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -327,6 +327,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_TGT_REPLAY_DROP 0x707 #define OBD_FAIL_TGT_FAKE_EXP 0x708 #define OBD_FAIL_TGT_REPLAY_DELAY 0x709 +#define OBD_FAIL_TGT_LAST_REPLAY 0x710 #define OBD_FAIL_MDC_REVALIDATE_PAUSE 0x800 #define OBD_FAIL_MDC_ENQUEUE_PAUSE 0x801 diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 2c498dc..ac048ee 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -188,7 +188,7 @@ out: RETURN(rc); } -static void destroy_import(struct obd_import *imp) +void client_destroy_import(struct obd_import *imp) { /* drop security policy instance after all rpc finished/aborted * to let all busy contexts be released. */ @@ -539,7 +539,7 @@ int client_disconnect_export(struct obd_export *exp) ptlrpc_free_rq_pool(imp->imp_rq_pool); imp->imp_rq_pool = NULL; } - destroy_import(imp); + client_destroy_import(imp); cli->cl_import = NULL; EXIT; @@ -962,7 +962,7 @@ dont_check_exports: atomic_inc(&target->obd_lock_replay_clients); if (target->obd_connected_clients == target->obd_max_recoverable_clients) - wake_up(&target->obd_next_transno_waitq); + cfs_waitq_signal(&target->obd_next_transno_waitq); } spin_unlock_bh(&target->obd_processing_task_lock); tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN); @@ -986,7 +986,7 @@ dont_check_exports: */ sptlrpc_import_inval_all_ctx(export->exp_imp_reverse); - destroy_import(export->exp_imp_reverse); + client_destroy_import(export->exp_imp_reverse); } /* for the rest part, we return -ENOTCONN in case of errors @@ -1055,7 +1055,7 @@ void target_destroy_export(struct obd_export *exp) /* exports created from last_rcvd data, and "fake" exports created by lctl don't have an import */ if (exp->exp_imp_reverse != NULL) - destroy_import(exp->exp_imp_reverse); + client_destroy_import(exp->exp_imp_reverse); /* We cancel locks at disconnect time, but this will catch any locks * granted in a race with recovery-induced disconnect. */ @@ -1189,7 +1189,7 @@ static void target_finish_recovery(struct obd_device *obd) list_empty(&obd->obd_final_req_queue)) { obd->obd_processing_task = 0; } else { - CERROR("%s: Recovery queues ( %s%s%s) are empty\n", + CERROR("%s: Recovery queues ( %s%s%s) are not empty\n", obd->obd_name, list_empty(&obd->obd_req_replay_queue) ? "" : "req ", list_empty(&obd->obd_lock_replay_queue) ? "" : "lock ", @@ -1465,7 +1465,8 @@ static int check_for_next_transno(struct obd_device *obd) * to replay requests that demand on already committed ones * also, we can replay first non-committed transation */ LASSERT(req_transno != 0); - if (req_transno == obd->obd_last_committed + 1) { + if (obd->obd_version_recov || + req_transno == obd->obd_last_committed + 1) { obd->obd_next_recovery_transno = req_transno; } else if (req_transno > obd->obd_last_committed) { /* can't continue recovery: have no needed transno */ @@ -1564,6 +1565,11 @@ static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd) req = list_entry(obd->obd_final_req_queue.next, struct ptlrpc_request, rq_list); list_del_init(&req->rq_list); + if (req->rq_export->exp_in_recovery) { + spin_lock(&req->rq_export->exp_lock); + req->rq_export->exp_in_recovery = 0; + spin_unlock(&req->rq_export->exp_lock); + } } else { req = NULL; } @@ -1571,6 +1577,11 @@ static struct ptlrpc_request *target_next_final_ping(struct obd_device *obd) return req; } +static inline int req_vbr_done(struct obd_export *exp) +{ + return (exp->exp_vbr_failed == 0); +} + static inline int req_replay_done(struct obd_export *exp) { return (exp->exp_req_replay_needed == 0); @@ -1588,7 +1599,7 @@ static inline int connect_done(struct obd_export *exp) static int check_for_clients(struct obd_device *obd) { - if (obd->obd_abort_recovery) + if (obd->obd_abort_recovery || obd->obd_version_recov) return 1; LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients); if (obd->obd_no_conn == 0 && @@ -1639,7 +1650,8 @@ static void resume_recovery_timer(struct obd_device *obd) static int target_recovery_thread(void *arg) { - struct obd_device *obd = arg; + struct lu_target *lut = arg; + struct obd_device *obd = lut->lut_obd; struct ptlrpc_request *req; struct target_recovery_data *trd = &obd->obd_recovery_data; struct l_wait_info lwi = { 0 }; @@ -1665,8 +1677,8 @@ static int target_recovery_thread(void *arg) env.le_ctx.lc_thread = thread; CERROR("%s: started recovery thread pid %d\n", obd->obd_name, - current->pid); - trd->trd_processing_task = current->pid; + cfs_curproc_pid()); + trd->trd_processing_task = cfs_curproc_pid(); obd->obd_recovering = 1; complete(&trd->trd_starting); @@ -1681,24 +1693,24 @@ static int target_recovery_thread(void *arg) spin_unlock_bh(&obd->obd_processing_task_lock); /* If some clients haven't connected in time, evict them */ - if (obd->obd_abort_recovery) { + if (obd->obd_connected_clients < obd->obd_max_recoverable_clients) { CWARN("Some clients haven't connect in time (%d/%d)," "evict them\n", obd->obd_connected_clients, obd->obd_max_recoverable_clients); - obd->obd_abort_recovery = obd->obd_stopping; - class_disconnect_stale_exports(obd, connect_done, - exp_flags_from_obd(obd) | + class_disconnect_stale_exports(obd, connect_done, + exp_flags_from_obd(obd) | OBD_OPT_ABORT_RECOV); } + /* next stage: replay requests */ delta = jiffies; obd->obd_req_replaying = 1; CDEBUG(D_INFO, "1: request replay stage - %d clients from t"LPU64"\n", - atomic_read(&obd->obd_req_replay_clients), - obd->obd_next_recovery_transno); + atomic_read(&obd->obd_req_replay_clients), + obd->obd_next_recovery_transno); resume_recovery_timer(obd); while ((req = target_next_replay_req(obd))) { - LASSERT(trd->trd_processing_task == current->pid); + LASSERT(trd->trd_processing_task == cfs_curproc_pid()); DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s", lustre_msg_get_transno(req->rq_reqmsg), libcfs_nid2str(req->rq_peer.nid)); @@ -1716,21 +1728,21 @@ static int target_recovery_thread(void *arg) /* If some clients haven't replayed requests in time, evict them */ if (obd->obd_abort_recovery) { - CDEBUG(D_ERROR, "req replay timed out, aborting ...\n"); - obd->obd_abort_recovery = obd->obd_stopping; - class_disconnect_stale_exports(obd, req_replay_done, - exp_flags_from_obd(obd) | + CDEBUG(D_WARNING, "req replay is aborted\n"); + class_disconnect_stale_exports(obd, req_replay_done, + exp_flags_from_obd(obd) | OBD_OPT_ABORT_RECOV); abort_req_replay_queue(obd); } + LASSERT(list_empty(&obd->obd_req_replay_queue)); /* The second stage: replay locks */ CDEBUG(D_INFO, "2: lock replay stage - %d clients\n", atomic_read(&obd->obd_lock_replay_clients)); resume_recovery_timer(obd); while ((req = target_next_replay_lock(obd))) { - LASSERT(trd->trd_processing_task == current->pid); - DEBUG_REQ(D_HA|D_WARNING, req, "processing lock from %s: ", + LASSERT(trd->trd_processing_task == cfs_curproc_pid()); + DEBUG_REQ(D_HA, req, "processing lock from %s: ", libcfs_nid2str(req->rq_peer.nid)); handle_recovery_req(thread, req, trd->trd_recovery_handler); @@ -1743,28 +1755,34 @@ static int target_recovery_thread(void *arg) /* If some clients haven't replayed requests in time, evict them */ if (obd->obd_abort_recovery) { int stale; - CERROR("lock replay timed out, aborting ...\n"); - obd->obd_abort_recovery = obd->obd_stopping; - stale = class_disconnect_stale_exports(obd, lock_replay_done, - exp_flags_from_obd(obd) | + CERROR("lock replay is aborted\n"); + stale = class_disconnect_stale_exports(obd, lock_replay_done, + exp_flags_from_obd(obd) | OBD_OPT_ABORT_RECOV); abort_lock_replay_queue(obd); } + LASSERT(list_empty(&obd->obd_lock_replay_queue)); + /* The third stage: reply on final pings */ + CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n"); + /** Update server last boot epoch */ + lut_boot_epoch_update(lut); /* We drop recoverying flag to forward all new requests * to regular mds_handle() since now */ spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_recovering = obd->obd_abort_recovery = 0; spin_unlock_bh(&obd->obd_processing_task_lock); - /* The third stage: reply on final pings */ - CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n"); while ((req = target_next_final_ping(obd))) { - LASSERT(trd->trd_processing_task == current->pid); + LASSERT(trd->trd_processing_task == cfs_curproc_pid()); DEBUG_REQ(D_HA, req, "processing final ping from %s: ", libcfs_nid2str(req->rq_peer.nid)); handle_recovery_req(thread, req, trd->trd_recovery_handler); } + /* evict exports failed VBR */ + class_disconnect_stale_exports(obd, req_vbr_done, + exp_flags_from_obd(obd) | + OBD_OPT_ABORT_RECOV); delta = (jiffies - delta) / HZ; CDEBUG(D_INFO,"4: recovery completed in %lus - %d/%d reqs/locks\n", @@ -1784,8 +1802,10 @@ static int target_recovery_thread(void *arg) RETURN(rc); } -int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler) +static int target_start_recovery_thread(struct lu_target *lut, + svc_handler_t handler) { + struct obd_device *obd = lut->lut_obd; int rc = 0; struct target_recovery_data *trd = &obd->obd_recovery_data; @@ -1794,7 +1814,7 @@ int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler) init_completion(&trd->trd_finishing); trd->trd_recovery_handler = handler; - if (kernel_thread(target_recovery_thread, obd, 0) > 0) { + if (kernel_thread(target_recovery_thread, lut, 0) > 0) { wait_for_completion(&trd->trd_starting); LASSERT(obd->obd_recovering != 0); } else @@ -1810,7 +1830,7 @@ void target_stop_recovery_thread(struct obd_device *obd) struct target_recovery_data *trd = &obd->obd_recovery_data; CERROR("%s: Aborting recovery\n", obd->obd_name); obd->obd_abort_recovery = 1; - wake_up(&obd->obd_next_transno_waitq); + cfs_waitq_signal(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); wait_for_completion(&trd->trd_finishing); } else { @@ -1834,21 +1854,26 @@ static void target_recovery_expired(unsigned long castmeharder) obd->obd_name, obd->obd_recoverable_clients, cfs_time_current_sec()- obd->obd_recovery_start, obd->obd_connected_clients); + spin_lock_bh(&obd->obd_processing_task_lock); - if (obd->obd_recovering) - obd->obd_abort_recovery = 1; + obd->obd_version_recov = 1; + CDEBUG(D_INFO, "VBR is used for %d clients from t"LPU64"\n", + atomic_read(&obd->obd_req_replay_clients), + obd->obd_next_recovery_transno); cfs_waitq_signal(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); } -void target_recovery_init(struct obd_device *obd, svc_handler_t handler) +void target_recovery_init(struct lu_target *lut, svc_handler_t handler) { + struct obd_device *obd = lut->lut_obd; if (obd->obd_max_recoverable_clients == 0) return; CWARN("RECOVERY: service %s, %d recoverable clients, " "last_transno "LPU64"\n", obd->obd_name, obd->obd_max_recoverable_clients, obd->obd_last_committed); + LASSERT(obd->obd_stopping == 0); obd->obd_next_recovery_transno = obd->obd_last_committed + 1; obd->obd_recovery_start = 0; obd->obd_recovery_end = 0; @@ -1856,13 +1881,14 @@ void target_recovery_init(struct obd_device *obd, svc_handler_t handler) /* bz13079: this should be set to desired value for ost but not for mds */ obd->obd_recovery_max_time = OBD_RECOVERY_MAX_TIME; cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd); - target_start_recovery_thread(obd, handler); + target_start_recovery_thread(lut, handler); } EXPORT_SYMBOL(target_recovery_init); #endif -int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req) +static int target_process_req_flags(struct obd_device *obd, + struct ptlrpc_request *req) { struct obd_export *exp = req->rq_export; LASSERT(exp != NULL); @@ -1879,7 +1905,6 @@ int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req) obd->obd_recoverable_clients--; if (atomic_read(&obd->obd_req_replay_clients) == 0) CDEBUG(D_HA, "all clients have replayed reqs\n"); - wake_up(&obd->obd_next_transno_waitq); } spin_unlock_bh(&obd->obd_processing_task_lock); } @@ -1895,7 +1920,6 @@ int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req) atomic_dec(&obd->obd_lock_replay_clients); if (atomic_read(&obd->obd_lock_replay_clients) == 0) CDEBUG(D_HA, "all clients have replayed locks\n"); - wake_up(&obd->obd_next_transno_waitq); } spin_unlock_bh(&obd->obd_processing_task_lock); } @@ -1909,7 +1933,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req, struct list_head *tmp; int inserted = 0; __u64 transno = lustre_msg_get_transno(req->rq_reqmsg); - ENTRY; if (obd->obd_recovery_data.trd_processing_task == cfs_curproc_pid()) { @@ -1927,6 +1950,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, RETURN(-ENOMEM); DEBUG_REQ(D_HA, req, "queue final req"); spin_lock_bh(&obd->obd_processing_task_lock); + cfs_waitq_signal(&obd->obd_next_transno_waitq); if (obd->obd_recovering) list_add_tail(&req->rq_list, &obd->obd_final_req_queue); else { @@ -1948,6 +1972,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, RETURN(-ENOMEM); DEBUG_REQ(D_HA, req, "queue lock replay req"); spin_lock_bh(&obd->obd_processing_task_lock); + cfs_waitq_signal(&obd->obd_next_transno_waitq); LASSERT(obd->obd_recovering); /* usually due to recovery abort */ if (!req->rq_export->exp_in_recovery) { @@ -1958,7 +1983,6 @@ int target_queue_recovery_request(struct ptlrpc_request *req, LASSERT(req->rq_export->exp_lock_replay_needed); list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue); spin_unlock_bh(&obd->obd_processing_task_lock); - wake_up(&obd->obd_next_transno_waitq); RETURN(0); } @@ -2043,7 +2067,7 @@ int target_queue_recovery_request(struct ptlrpc_request *req, list_add_tail(&req->rq_list, &obd->obd_req_replay_queue); obd->obd_requests_queued_for_recovery++; - wake_up(&obd->obd_next_transno_waitq); + cfs_waitq_signal(&obd->obd_next_transno_waitq); spin_unlock_bh(&obd->obd_processing_task_lock); RETURN(0); } @@ -2156,21 +2180,18 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) rs->rs_transno = req->rq_transno; rs->rs_export = exp; - spin_lock(&obd->obd_uncommitted_replies_lock); - + spin_lock(&exp->exp_uncommitted_replies_lock); CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n", - rs->rs_transno, obd->obd_last_committed); - if (rs->rs_transno > obd->obd_last_committed) { + rs->rs_transno, exp->exp_last_committed); + if (rs->rs_transno > exp->exp_last_committed) { /* not committed already */ - list_add_tail (&rs->rs_obd_list, - &obd->obd_uncommitted_replies); + list_add_tail(&rs->rs_obd_list, + &exp->exp_uncommitted_replies); } + spin_unlock (&exp->exp_uncommitted_replies_lock); - spin_unlock (&obd->obd_uncommitted_replies_lock); - spin_lock (&exp->exp_lock); - - list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies); - + spin_lock(&exp->exp_lock); + list_add_tail(&rs->rs_exp_list, &exp->exp_outstanding_replies); spin_unlock(&exp->exp_lock); netrc = target_send_reply_msg (req, rc, fail_id); @@ -2191,7 +2212,7 @@ void target_send_reply(struct ptlrpc_request *req, int rc, int fail_id) } spin_lock(&rs->rs_lock); - if (rs->rs_transno <= obd->obd_last_committed || + if (rs->rs_transno <= exp->exp_last_committed || (!rs->rs_on_net && !rs->rs_no_ack) || list_empty(&rs->rs_exp_list) || /* completed already */ list_empty(&rs->rs_obd_list)) { @@ -2214,26 +2235,19 @@ int target_handle_ping(struct ptlrpc_request *req) void target_committed_to_req(struct ptlrpc_request *req) { - struct obd_device *obd; - - if (req == NULL || req->rq_export == NULL) - return; - - obd = req->rq_export->exp_obd; - if (obd == NULL) - return; + struct obd_export *exp = req->rq_export; - if (!obd->obd_no_transno && req->rq_repmsg != NULL) + if (!exp->exp_obd->obd_no_transno && req->rq_repmsg != NULL) lustre_msg_set_last_committed(req->rq_repmsg, - obd->obd_last_committed); + exp->exp_last_committed); else DEBUG_REQ(D_IOCTL, req, "not sending last_committed update (%d/" - "%d)", obd->obd_no_transno, req->rq_repmsg == NULL); + "%d)", exp->exp_obd->obd_no_transno, + req->rq_repmsg == NULL); CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n", - obd->obd_last_committed, req->rq_transno, req->rq_xid); + exp->exp_last_committed, req->rq_transno, req->rq_xid); } - EXPORT_SYMBOL(target_committed_to_req); int target_handle_qc_callback(struct ptlrpc_request *req) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index ae31ec6..3aabe5e 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -2557,7 +2557,6 @@ EXPORT_SYMBOL(client_obd_setup); EXPORT_SYMBOL(client_obd_cleanup); EXPORT_SYMBOL(client_connect_import); EXPORT_SYMBOL(client_disconnect_export); -EXPORT_SYMBOL(target_start_recovery_thread); EXPORT_SYMBOL(target_stop_recovery_thread); EXPORT_SYMBOL(target_handle_connect); EXPORT_SYMBOL(target_cleanup_recovery); diff --git a/lustre/liblustre/llite_lib.c b/lustre/liblustre/llite_lib.c index 5a34a82..7baec8c 100644 --- a/lustre/liblustre/llite_lib.c +++ b/lustre/liblustre/llite_lib.c @@ -177,7 +177,7 @@ int liblustre_process_log(struct config_llog_instance *cfg, GOTO(out_cleanup, rc = -ENOMEM); ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_FID | - OBD_CONNECT_AT; + OBD_CONNECT_AT | OBD_CONNECT_VBR; #ifdef LIBLUSTRE_POSIX_ACL ocd->ocd_connect_flags |= OBD_CONNECT_ACL; #endif diff --git a/lustre/liblustre/super.c b/lustre/liblustre/super.c index 5e8acf3..0234909 100644 --- a/lustre/liblustre/super.c +++ b/lustre/liblustre/super.c @@ -2017,7 +2017,8 @@ llu_fsswop_mount(const char *source, sizeof(async), &async, NULL); ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION | - OBD_CONNECT_FID | OBD_CONNECT_AT; + OBD_CONNECT_FID | OBD_CONNECT_AT | + OBD_CONNECT_VBR; #ifdef LIBLUSTRE_POSIX_ACL ocd.ocd_connect_flags |= OBD_CONNECT_ACL; #endif diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index b89b895..e780458 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -196,7 +196,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) OBD_CONNECT_VERSION | OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET| OBD_CONNECT_FID | OBD_CONNECT_AT | - OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT; + OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT | + OBD_CONNECT_VBR; #ifdef HAVE_LRU_RESIZE_SUPPORT if (sbi->ll_flags & LL_SBI_LRU_RESIZE) @@ -339,7 +340,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt) OBD_CONNECT_CANCELSET | OBD_CONNECT_FID | OBD_CONNECT_SRVLOCK | OBD_CONNECT_TRUNCLOCK| OBD_CONNECT_AT | OBD_CONNECT_RMT_CLIENT | - OBD_CONNECT_OSS_CAPA | OBD_CONNECT_GRANT_SHRINK; + OBD_CONNECT_OSS_CAPA | OBD_CONNECT_VBR| + OBD_CONNECT_GRANT_SHRINK; if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) { /* OBD_CONNECT_CKSUM should always be set, even if checksums are diff --git a/lustre/lvfs/fsfilt_ext3.c b/lustre/lvfs/fsfilt_ext3.c index d69e284..f24135d 100644 --- a/lustre/lvfs/fsfilt_ext3.c +++ b/lustre/lvfs/fsfilt_ext3.c @@ -146,6 +146,8 @@ static char *fsfilt_ext3_uuid(struct super_block *sb) */ static __u64 fsfilt_ext3_get_version(struct inode *inode) { + CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n", + EXT3_I(inode)->i_fs_version, inode->i_ino); return EXT3_I(inode)->i_fs_version; } @@ -156,7 +158,12 @@ static __u64 fsfilt_ext3_set_version(struct inode *inode, __u64 new_version) { __u64 old_version = EXT3_I(inode)->i_fs_version; + CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n", + new_version, old_version, inode->i_ino); (EXT3_I(inode))->i_fs_version = new_version; + /* version is set after all inode operations are finished, so we should + * mark it dirty here */ + inode->i_sb->s_op->dirty_inode(inode); return old_version; } diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 8586475..b815d12 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -84,6 +84,7 @@ static int mdd_device_init(const struct lu_env *env, struct lu_device *d, mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb; mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb; mdd->mdd_txn_cb.dtc_cookie = mdd; + mdd->mdd_txn_cb.dtc_tag = LCT_MD_THREAD; CFS_INIT_LIST_HEAD(&mdd->mdd_txn_cb.dtc_linkage); mdd->mdd_atime_diff = MAX_ATIME_DIFF; diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 0a86597..970ef8b 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -2231,6 +2231,24 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj) return next->do_ops->do_object_sync(env, next); } +static dt_obj_version_t mdd_version_get(const struct lu_env *env, + struct md_object *obj) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + LASSERT(mdd_object_exists(mdd_obj)); + return do_version_get(env, mdd_object_child(mdd_obj)); +} + +static void mdd_version_set(const struct lu_env *env, struct md_object *obj, + dt_obj_version_t version) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + + LASSERT(mdd_object_exists(mdd_obj)); + return do_version_set(env, mdd_object_child(mdd_obj), version); +} + const struct md_object_operations mdd_obj_ops = { .moo_permission = mdd_permission, .moo_attr_get = mdd_attr_get, @@ -2248,5 +2266,7 @@ const struct md_object_operations mdd_obj_ops = { .moo_readlink = mdd_readlink, .moo_capa_get = mdd_capa_get, .moo_object_sync = mdd_object_sync, + .moo_version_get = mdd_version_get, + .moo_version_set = mdd_version_set, .moo_path = mdd_path, }; diff --git a/lustre/mdt/mdt_capa.c b/lustre/mdt/mdt_capa.c index 1f03d81..3592985 100644 --- a/lustre/mdt/mdt_capa.c +++ b/lustre/mdt/mdt_capa.c @@ -99,9 +99,9 @@ static int write_capa_keys(const struct lu_env *env, for (i = 0; i < 2; i++) { lck_cpu_to_le(tmp, &keys[i]); - rc = mdt_record_write(env, mdt->mdt_ck_obj, - mdt_buf_const(env, tmp, sizeof(*tmp)), - &off, th); + rc = dt_record_write(env, mdt->mdt_ck_obj, + mdt_buf_const(env, tmp, sizeof(*tmp)), + &off, th); if (rc) break; } @@ -125,8 +125,8 @@ static int read_capa_keys(const struct lu_env *env, tmp = &mti->mti_capa_key; for (i = 0; i < 2; i++) { - rc = mdt_record_read(env, mdt->mdt_ck_obj, - mdt_buf(env, tmp, sizeof(*tmp)), &off); + rc = dt_record_read(env, mdt->mdt_ck_obj, + mdt_buf(env, tmp, sizeof(*tmp)), &off); if (rc) return rc; diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index d84f3f6..0099f50 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -2652,6 +2652,10 @@ static void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET; info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg); + info->mti_mos[0] = NULL; + info->mti_mos[1] = NULL; + info->mti_mos[2] = NULL; + info->mti_mos[3] = NULL; memset(&info->mti_attr, 0, sizeof(info->mti_attr)); info->mti_body = NULL; @@ -3355,7 +3359,8 @@ static int mdt_intent_reint(enum mdt_it_code opcode, rep->lock_policy_res2 = clear_serious(rc); lhc->mlh_reg_lh.cookie = 0ull; - if (rc == -ENOTCONN || rc == -ENODEV) { + if (rc == -ENOTCONN || rc == -ENODEV || + rc == -EOVERFLOW) { /**< if VBR failure then return error */ /* * If it is the disconnect error (ENODEV & ENOCONN), the error * will be returned by rq_status, and client at ptlrpc layer @@ -4298,6 +4303,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) int waited = 0; ENTRY; + target_recovery_fini(obd); /* At this point, obd exports might still be on the "obd_zombie_exports" * list, and obd_zombie_impexp_thread() is trying to destroy them. * We wait a little bit until all exports (except the self-export) @@ -4321,7 +4327,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) ping_evictor_stop(); - target_recovery_fini(obd); mdt_stop_ptlrpc_service(m); mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT); mdt_obd_llog_cleanup(obd); @@ -4329,6 +4334,7 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m) #ifdef HAVE_QUOTA_SUPPORT next->md_ops->mdo_quota.mqo_cleanup(env, next); #endif + lut_fini(env, &m->mdt_lut); mdt_fs_cleanup(env, m); upcall_cache_cleanup(m->mdt_identity_cache); m->mdt_identity_cache = NULL; @@ -4597,10 +4603,14 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, GOTO(err_fini_proc, rc); } - rc = mdt_fld_init(env, obd->obd_name, m); + rc = lut_init(env, &m->mdt_lut, obd, m->mdt_bottom); if (rc) GOTO(err_fini_stack, rc); + rc = mdt_fld_init(env, obd->obd_name, m); + if (rc) + GOTO(err_lut, rc); + rc = mdt_seq_init(env, obd->obd_name, m); if (rc) GOTO(err_fini_fld, rc); @@ -4660,7 +4670,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, server_put_mount_2(dev, lmi->lmi_mnt); lmi = NULL; - target_recovery_init(obd, mdt_recovery_handle); + target_recovery_init(&m->mdt_lut, mdt_recovery_handle); rc = mdt_start_ptlrpc_service(m); if (rc) @@ -4710,6 +4720,8 @@ err_fini_seq: mdt_seq_fini(env, m); err_fini_fld: mdt_fld_fini(env, m); +err_lut: + lut_fini(env, &m->mdt_lut); err_fini_stack: mdt_stack_fini(env, m, md2lu_dev(m->mdt_child)); err_fini_proc: diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index c8f215c..953ede9 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -60,29 +60,18 @@ * struct lustre_handle */ #include +#include +#include #include -#include #include #include #include -/* LR_CLIENT_SIZE, etc. */ -#include #include #include #include #include #include -static inline __u64 lcd_last_transno(struct lsd_client_data *lcd) -{ - return max(lcd->lcd_last_transno, lcd->lcd_last_close_transno); -} - -static inline __u64 lcd_last_xid(struct lsd_client_data *lcd) -{ - return max(lcd->lcd_last_xid, lcd->lcd_last_close_xid); -} - /* check if request's xid is equal to last one or not*/ static inline int req_xid_is_last(struct ptlrpc_request *req) { @@ -120,6 +109,8 @@ struct mdt_device { /* underlying device */ struct md_device *mdt_child; struct dt_device *mdt_bottom; + /** target device */ + struct lu_target mdt_lut; /* * Options bit-fields. */ @@ -138,25 +129,13 @@ struct mdt_device { spinlock_t mdt_ioepoch_lock; __u64 mdt_ioepoch; - /* Transaction related stuff here */ - spinlock_t mdt_transno_lock; - __u64 mdt_last_transno; - /* transaction callbacks */ struct dt_txn_callback mdt_txn_cb; - /* last_rcvd file */ - struct dt_object *mdt_last_rcvd; /* these values should be updated from lov if necessary. * or should be placed somewhere else. */ int mdt_max_mdsize; int mdt_max_cookiesize; - __u64 mdt_mount_count; - - /* last_rcvd data */ - struct lr_server_data mdt_lsd; - spinlock_t mdt_client_bitmap_lock; - unsigned long mdt_client_bitmap[(LR_MAX_CLIENTS >> 3) / sizeof(long)]; struct upcall_cache *mdt_identity_cache; @@ -188,6 +167,14 @@ struct mdt_device { int mdt_sec_level; }; +#define mdt_transno_lock mdt_lut.lut_translock +#define mdt_last_transno mdt_lut.lut_last_transno +#define mdt_last_rcvd mdt_lut.lut_last_rcvd +#define mdt_mount_count mdt_lut.lut_mount_count +#define mdt_lsd mdt_lut.lut_lsd +#define mdt_client_bitmap_lock mdt_lut.lut_client_bitmap_lock +#define mdt_client_bitmap mdt_lut.lut_client_bitmap + #define MDT_SERVICE_WATCHDOG_FACTOR (2000) #define MDT_ROCOMPAT_SUPP (OBD_ROCOMPAT_LOVOBJID) #define MDT_INCOMPAT_SUPP (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR) @@ -332,6 +319,9 @@ struct mdt_thread_info { */ struct mdt_reint_record mti_rr; + /** md objects included in operation */ + struct mdt_object *mti_mos[PTLRPC_NUM_VERSIONS]; + /* * Operation specification (currently create and lookup) */ @@ -382,6 +372,11 @@ struct mdt_thread_info { struct md_attr mti_tmp_attr; }; +#define mti_parent mti_mos[0] +#define mti_child mti_mos[1] +#define mti_parent1 mti_mos[2] +#define mti_child1 mti_mos[3] + typedef void (*mdt_cb_t)(const struct mdt_device *mdt, __u64 transno, void *data, int err); struct mdt_commit_cb { @@ -394,7 +389,6 @@ enum mdt_txn_op { MDT_TXN_LAST_RCVD_WRITE_OP, }; - /* * Info allocated per-transaction. */ @@ -402,13 +396,13 @@ enum mdt_txn_op { struct mdt_txn_info { __u64 txi_transno; unsigned int txi_cb_count; - struct mdt_commit_cb txi_cb[MDT_MAX_COMMIT_CB]; + struct lut_commit_cb txi_cb[MDT_MAX_COMMIT_CB]; }; extern struct lu_context_key mdt_txn_key; static inline void mdt_trans_add_cb(const struct thandle *th, - mdt_cb_t cb_func, void *cb_data) + lut_cb_t cb_func, void *cb_data) { struct mdt_txn_info *txi; @@ -416,8 +410,8 @@ static inline void mdt_trans_add_cb(const struct thandle *th, LASSERT(txi->txi_cb_count < ARRAY_SIZE(txi->txi_cb)); /* add new callback */ - txi->txi_cb[txi->txi_cb_count].mdt_cb_func = cb_func; - txi->txi_cb[txi->txi_cb_count].mdt_cb_data = cb_data; + txi->txi_cb[txi->txi_cb_count].lut_cb_func = cb_func; + txi->txi_cb[txi->txi_cb_count].lut_cb_data = cb_data; txi->txi_cb_count++; } @@ -546,7 +540,7 @@ void mdt_reconstruct_generic(struct mdt_thread_info *mti, struct mdt_lock_handle *lhc); extern void target_recovery_fini(struct obd_device *obd); -extern void target_recovery_init(struct obd_device *obd, +extern void target_recovery_init(struct lu_target *lut, svc_handler_t handler); int mdt_fs_setup(const struct lu_env *, struct mdt_device *, struct obd_device *, struct lustre_sb_info *lsi); @@ -618,6 +612,7 @@ int mdt_check_ucred(struct mdt_thread_info *); int mdt_init_ucred(struct mdt_thread_info *, struct mdt_body *); int mdt_init_ucred_reint(struct mdt_thread_info *); void mdt_exit_ucred(struct mdt_thread_info *); +int mdt_version_get_check(struct mdt_thread_info *, int); /* mdt_idmap.c */ int mdt_init_sec_level(struct mdt_thread_info *); diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 0f29b77..2e077ce 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -1003,6 +1003,12 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) /* Not found and with MDS_OPEN_CREAT: let's create it. */ mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE); + info->mti_mos[0] = parent; + info->mti_mos[1] = child; + result = mdt_version_get_check(info, 0); + if (result) + GOTO(out_child, result); + /* Let lower layers know what is lock mode on directory. */ info->mti_spec.sp_cr_mode = mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode); diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index c32ae5a..10794c3 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -78,38 +78,6 @@ const struct lu_buf *mdt_buf_const(const struct lu_env *env, return buf; } -int mdt_record_read(const struct lu_env *env, - struct dt_object *dt, struct lu_buf *buf, loff_t *pos) -{ - int rc; - - LASSERTF(dt != NULL, "dt is NULL when we want to read record\n"); - - rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA); - - if (rc == buf->lb_len) - rc = 0; - else if (rc >= 0) - rc = -EFAULT; - return rc; -} - -int mdt_record_write(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *buf, - loff_t *pos, struct thandle *th) -{ - int rc; - - LASSERTF(dt != NULL, "dt is NULL when we want to write record\n"); - LASSERT(th != NULL); - rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1); - if (rc == buf->lb_len) - rc = 0; - else if (rc >= 0) - rc = -EFAULT; - return rc; -} - static inline int mdt_trans_credit_get(const struct lu_env *env, struct mdt_device *mdt, enum mdt_txn_op op) @@ -166,61 +134,6 @@ void mdt_trans_stop(const struct lu_env *env, mdt->mdt_bottom->dd_ops->dt_trans_stop(env, th); } -/* last_rcvd handling */ -static inline void lsd_le_to_cpu(struct lr_server_data *buf, - struct lr_server_data *lsd) -{ - memcpy(lsd->lsd_uuid, buf->lsd_uuid, sizeof (lsd->lsd_uuid)); - lsd->lsd_last_transno = le64_to_cpu(buf->lsd_last_transno); - lsd->lsd_mount_count = le64_to_cpu(buf->lsd_mount_count); - lsd->lsd_feature_compat = le32_to_cpu(buf->lsd_feature_compat); - lsd->lsd_feature_rocompat = le32_to_cpu(buf->lsd_feature_rocompat); - lsd->lsd_feature_incompat = le32_to_cpu(buf->lsd_feature_incompat); - lsd->lsd_server_size = le32_to_cpu(buf->lsd_server_size); - lsd->lsd_client_start = le32_to_cpu(buf->lsd_client_start); - lsd->lsd_client_size = le16_to_cpu(buf->lsd_client_size); -} - -static inline void lsd_cpu_to_le(struct lr_server_data *lsd, - struct lr_server_data *buf) -{ - memcpy(buf->lsd_uuid, lsd->lsd_uuid, sizeof (lsd->lsd_uuid)); - buf->lsd_last_transno = cpu_to_le64(lsd->lsd_last_transno); - buf->lsd_mount_count = cpu_to_le64(lsd->lsd_mount_count); - buf->lsd_feature_compat = cpu_to_le32(lsd->lsd_feature_compat); - buf->lsd_feature_rocompat = cpu_to_le32(lsd->lsd_feature_rocompat); - buf->lsd_feature_incompat = cpu_to_le32(lsd->lsd_feature_incompat); - buf->lsd_server_size = cpu_to_le32(lsd->lsd_server_size); - buf->lsd_client_start = cpu_to_le32(lsd->lsd_client_start); - buf->lsd_client_size = cpu_to_le16(lsd->lsd_client_size); -} - -static inline void lcd_le_to_cpu(struct lsd_client_data *buf, - struct lsd_client_data *lcd) -{ - memcpy(lcd->lcd_uuid, buf->lcd_uuid, sizeof (lcd->lcd_uuid)); - lcd->lcd_last_transno = le64_to_cpu(buf->lcd_last_transno); - lcd->lcd_last_xid = le64_to_cpu(buf->lcd_last_xid); - lcd->lcd_last_result = le32_to_cpu(buf->lcd_last_result); - lcd->lcd_last_data = le32_to_cpu(buf->lcd_last_data); - lcd->lcd_last_close_transno = le64_to_cpu(buf->lcd_last_close_transno); - lcd->lcd_last_close_xid = le64_to_cpu(buf->lcd_last_close_xid); - lcd->lcd_last_close_result = le32_to_cpu(buf->lcd_last_close_result); -} - -static inline void lcd_cpu_to_le(struct lsd_client_data *lcd, - struct lsd_client_data *buf) -{ - memcpy(buf->lcd_uuid, lcd->lcd_uuid, sizeof (lcd->lcd_uuid)); - buf->lcd_last_transno = cpu_to_le64(lcd->lcd_last_transno); - buf->lcd_last_xid = cpu_to_le64(lcd->lcd_last_xid); - buf->lcd_last_result = cpu_to_le32(lcd->lcd_last_result); - buf->lcd_last_data = cpu_to_le32(lcd->lcd_last_data); - buf->lcd_last_close_transno = cpu_to_le64(lcd->lcd_last_close_transno); - buf->lcd_last_close_xid = cpu_to_le64(lcd->lcd_last_close_xid); - buf->lcd_last_close_result = cpu_to_le32(lcd->lcd_last_close_result); -} - static inline int mdt_last_rcvd_header_read(const struct lu_env *env, struct mdt_device *mdt) { @@ -230,9 +143,9 @@ static inline int mdt_last_rcvd_header_read(const struct lu_env *env, mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); mti->mti_off = 0; - rc = mdt_record_read(env, mdt->mdt_last_rcvd, - mdt_buf(env, &mti->mti_lsd, sizeof(mti->mti_lsd)), - &mti->mti_off); + rc = dt_record_read(env, mdt->mdt_last_rcvd, + mdt_buf(env, &mti->mti_lsd, sizeof(mti->mti_lsd)), + &mti->mti_off); if (rc == 0) lsd_le_to_cpu(&mti->mti_lsd, &mdt->mdt_lsd); @@ -244,13 +157,6 @@ static inline int mdt_last_rcvd_header_read(const struct lu_env *env, return rc; } -static void mdt_client_cb(const struct mdt_device *mdt, __u64 transno, - void *data, int err) -{ - struct obd_device *obd = mdt2obd_dev(mdt); - target_client_add_cb(obd, transno, data, err); -} - static inline int mdt_last_rcvd_header_write(const struct lu_env *env, struct mdt_device *mdt, int need_sync) @@ -276,12 +182,12 @@ static inline int mdt_last_rcvd_header_write(const struct lu_env *env, lsd_cpu_to_le(&mdt->mdt_lsd, &mti->mti_lsd); if (need_sync && mti->mti_exp) - mdt_trans_add_cb(th, mdt_client_cb, mti->mti_exp); + mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp); - rc = mdt_record_write(env, mdt->mdt_last_rcvd, - mdt_buf_const(env, &mti->mti_lsd, - sizeof(mti->mti_lsd)), - &mti->mti_off, th); + rc = dt_record_write(env, mdt->mdt_last_rcvd, + mdt_buf_const(env, &mti->mti_lsd, + sizeof(mti->mti_lsd)), + &mti->mti_off, th); mdt_trans_stop(env, mdt, th); @@ -302,8 +208,8 @@ static int mdt_last_rcvd_read(const struct lu_env *env, mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key); tmp = &mti->mti_lcd; - rc = mdt_record_read(env, mdt->mdt_last_rcvd, - mdt_buf(env, tmp, sizeof(*tmp)), off); + rc = dt_record_read(env, mdt->mdt_last_rcvd, + mdt_buf(env, tmp, sizeof(*tmp)), off); if (rc == 0) lcd_le_to_cpu(tmp, lcd); @@ -344,8 +250,8 @@ static int mdt_last_rcvd_write(const struct lu_env *env, lcd_cpu_to_le(lcd, tmp); - rc = mdt_record_write(env, mdt->mdt_last_rcvd, - mdt_buf_const(env, tmp, sizeof(*tmp)), off, th); + rc = dt_record_write(env, mdt->mdt_last_rcvd, + mdt_buf_const(env, tmp, sizeof(*tmp)), off, th); CDEBUG(D_INFO, "write lcd @%d rc = %d:\n" "uuid = %s\n" @@ -440,6 +346,8 @@ static int mdt_clients_data_init(const struct lu_env *env, rc = mdt_client_add(env, mdt, cl_idx); /* can't fail existing */ LASSERTF(rc == 0, "rc = %d\n", rc); + /* VBR: set export last committed version */ + exp->exp_last_committed = last_transno; lcd = NULL; spin_lock(&exp->exp_lock); exp->exp_connecting = 0; @@ -578,7 +486,7 @@ static int mdt_server_data_init(const struct lu_env *env, lsd->lsd_mount_count = mdt->mdt_mount_count; /* save it, so mount count and last_transno is current */ - rc = mdt_server_data_update(env, mdt, (mti->mti_exp && + rc = mdt_server_data_update(env, mdt, (mti->mti_exp && mti->mti_exp->exp_need_sync)); if (rc) GOTO(err_client, rc); @@ -586,7 +494,7 @@ static int mdt_server_data_init(const struct lu_env *env, RETURN(0); err_client: - target_recovery_fini(obd); + class_disconnect_exports(obd); out: return rc; } @@ -671,13 +579,13 @@ int mdt_client_new(const struct lu_env *env, struct mdt_device *mdt) if (IS_ERR(th)) RETURN(PTR_ERR(th)); - /* + /* * Until this operations will be committed the sync is needed * for this export. This should be done _after_ starting the * transaction so that many connecting clients will not bring - * server down with lots of sync writes. + * server down with lots of sync writes. */ - mdt_trans_add_cb(th, mdt_client_cb, mti->mti_exp); + mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp); spin_lock(&mti->mti_exp->exp_lock); mti->mti_exp->exp_need_sync = 1; spin_unlock(&mti->mti_exp->exp_lock); @@ -813,11 +721,11 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt) GOTO(free, rc = PTR_ERR(th)); if (need_sync) { - /* + /* * Until this operations will be committed the sync - * is needed for this export. + * is needed for this export. */ - mdt_trans_add_cb(th, mdt_client_cb, exp); + mdt_trans_add_cb(th, lut_cb_client, exp); } mutex_down(&med->med_lcd_lock); @@ -836,10 +744,10 @@ int mdt_client_del(const struct lu_env *env, struct mdt_device *mdt) clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap); spin_unlock(&mdt->mdt_client_bitmap_lock); - /* + /* * Make sure the server's last_transno is up to date. Do this * after the client is freed so we know all the client's - * transactions have been committed. + * transactions have been committed. */ mdt_server_data_update(env, mdt, need_sync); @@ -863,7 +771,6 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti, loff_t off; int err; __s32 rc = th->th_result; - __u64 *transno_p; ENTRY; LASSERT(req); @@ -882,14 +789,25 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti, } off = med->med_lr_off; + LASSERT(ergo(mti->mti_transno == 0, rc != 0)); mutex_down(&med->med_lcd_lock); if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE || lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) { - transno_p = &lcd->lcd_last_close_transno; + if (mti->mti_transno != 0) + lcd->lcd_last_close_transno = mti->mti_transno; lcd->lcd_last_close_xid = req->rq_xid; lcd->lcd_last_close_result = rc; } else { - transno_p = &lcd->lcd_last_transno; + /* VBR: save versions in last_rcvd for reconstruct. */ + __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg); + if (pre_versions) { + lcd->lcd_pre_versions[0] = pre_versions[0]; + lcd->lcd_pre_versions[1] = pre_versions[1]; + lcd->lcd_pre_versions[2] = pre_versions[2]; + lcd->lcd_pre_versions[3] = pre_versions[3]; + } + if (mti->mti_transno != 0) + lcd->lcd_last_transno = mti->mti_transno; lcd->lcd_last_xid = req->rq_xid; lcd->lcd_last_result = rc; /*XXX: save intent_disposition in mdt_thread_info? @@ -898,20 +816,6 @@ static int mdt_last_rcvd_update(struct mdt_thread_info *mti, lcd->lcd_last_data = mti->mti_opdata; } - /* - * When we store zero transno in lcd we can lost last transno value - * because lcd contains 0, but lsd is not yet written - * The server data should be updated also if the latest - * transno is rewritten by zero. See the bug 11125 for details. - */ - if (mti->mti_transno == 0 && - *transno_p == mdt->mdt_last_transno) - mdt_server_data_update(mti->mti_env, mdt, - (mti->mti_exp && - mti->mti_exp->exp_need_sync)); - - *transno_p = mti->mti_transno; - if (off <= 0) { CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off); err = -EINVAL; @@ -935,6 +839,17 @@ static int mdt_txn_start_cb(const struct lu_env *env, return 0; } +/* Set new object versions */ +static void mdt_versions_set(struct mdt_thread_info *info) +{ + int i; + for (i = 0; i < PTLRPC_NUM_VERSIONS; i++) + if (info->mti_mos[i] != NULL) + mo_version_set(info->mti_env, + mdt_object_child(info->mti_mos[i]), + info->mti_transno); +} + /* Update last_rcvd records with latests transaction data */ static int mdt_txn_stop_cb(const struct lu_env *env, struct thandle *txn, void *cookie) @@ -969,7 +884,6 @@ static int mdt_txn_stop_cb(const struct lu_env *env, if (mti->mti_transno != 0) { CERROR("Replay transno "LPU64" failed: rc %i\n", mti->mti_transno, txn->th_result); - mti->mti_transno = 0; } } else if (mti->mti_transno == 0) { mti->mti_transno = ++ mdt->mdt_last_transno; @@ -978,10 +892,14 @@ static int mdt_txn_stop_cb(const struct lu_env *env, if (mti->mti_transno > mdt->mdt_last_transno) mdt->mdt_last_transno = mti->mti_transno; } - + spin_unlock(&mdt->mdt_transno_lock); /* sometimes the reply message has not been successfully packed */ LASSERT(req != NULL && req->rq_repmsg != NULL); + /** VBR: set new versions */ + if (txn->th_result == 0) + mdt_versions_set(mti); + /* filling reply data */ CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n", mti->mti_transno, req->rq_export->exp_obd->obd_last_committed); @@ -992,7 +910,10 @@ static int mdt_txn_stop_cb(const struct lu_env *env, lcd_last_xid(req->rq_export->exp_mdt_data.med_lcd)); /* save transno for the commit callback */ txi->txi_transno = mti->mti_transno; - spin_unlock(&mdt->mdt_transno_lock); + + /* add separate commit callback for transaction handling because we need + * export as parameter */ + mdt_trans_add_cb(txn, lut_cb_last_committed, mti->mti_exp); return mdt_last_rcvd_update(mti, txn); } @@ -1002,29 +923,15 @@ static int mdt_txn_commit_cb(const struct lu_env *env, struct thandle *txn, void *cookie) { struct mdt_device *mdt = cookie; - struct obd_device *obd = mdt2obd_dev(mdt); struct mdt_txn_info *txi; int i; txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key); - /* copy of obd_transno_commit_cb() but with locking */ - spin_lock(&mdt->mdt_transno_lock); - if (txi->txi_transno > obd->obd_last_committed) { - obd->obd_last_committed = txi->txi_transno; - spin_unlock(&mdt->mdt_transno_lock); - ptlrpc_commit_replies(obd); - } else - spin_unlock(&mdt->mdt_transno_lock); - - if (txi->txi_transno) - CDEBUG(D_HA, "%s: transno "LPD64" is committed\n", - obd->obd_name, txi->txi_transno); - /* iterate through all additional callbacks */ for (i = 0; i < txi->txi_cb_count; i++) { - txi->txi_cb[i].mdt_cb_func(mdt, txi->txi_transno, - txi->txi_cb[i].mdt_cb_data, 0); + txi->txi_cb[i].lut_cb_func(&mdt->mdt_lut, txi->txi_transno, + txi->txi_cb[i].lut_cb_data, 0); } return 0; } @@ -1046,21 +953,14 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb; mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb; mdt->mdt_txn_cb.dtc_cookie = mdt; + mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD; CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage); dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb); - o = dt_store_open(env, mdt->mdt_bottom, "", LAST_RCVD, &fid); - if (!IS_ERR(o)) { - mdt->mdt_last_rcvd = o; - rc = mdt_server_data_init(env, mdt, lsi); - if (rc) - GOTO(put_last_rcvd, rc); - } else { - rc = PTR_ERR(o); - CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc); + rc = mdt_server_data_init(env, mdt, lsi); + if (rc) RETURN(rc); - } o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid); if (!IS_ERR(o)) { @@ -1071,16 +971,15 @@ int mdt_fs_setup(const struct lu_env *env, struct mdt_device *mdt, } else { rc = PTR_ERR(o); CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc); - GOTO(put_last_rcvd, rc); + GOTO(disconnect_exports, rc); } RETURN(0); put_ck_object: lu_object_put(env, &o->do_lu); mdt->mdt_ck_obj = NULL; -put_last_rcvd: - lu_object_put(env, &mdt->mdt_last_rcvd->do_lu); - mdt->mdt_last_rcvd = NULL; +disconnect_exports: + class_disconnect_exports(obd); return rc; } @@ -1090,9 +989,6 @@ void mdt_fs_cleanup(const struct lu_env *env, struct mdt_device *mdt) /* Remove transaction callback */ dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb); - if (mdt->mdt_last_rcvd) - lu_object_put(env, &mdt->mdt_last_rcvd->do_lu); - mdt->mdt_last_rcvd = NULL; if (mdt->mdt_ck_obj) lu_object_put(env, &mdt->mdt_ck_obj->do_lu); mdt->mdt_ck_obj = NULL; @@ -1151,6 +1047,20 @@ static void mdt_steal_ack_locks(struct ptlrpc_request *req) spin_unlock(&exp->exp_lock); } +/** + * VBR: restore versions + */ +void mdt_vbr_reconstruct(struct ptlrpc_request *req, + struct lsd_client_data *lcd) +{ + __u64 pre_versions[4] = {0}; + pre_versions[0] = lcd->lcd_pre_versions[0]; + pre_versions[1] = lcd->lcd_pre_versions[1]; + pre_versions[2] = lcd->lcd_pre_versions[2]; + pre_versions[3] = lcd->lcd_pre_versions[3]; + lustre_msg_set_versions(req->rq_repmsg, pre_versions); +} + void mdt_req_from_lcd(struct ptlrpc_request *req, struct lsd_client_data *lcd) { @@ -1161,14 +1071,18 @@ void mdt_req_from_lcd(struct ptlrpc_request *req, lustre_msg_get_opc(req->rq_repmsg) == MDS_DONE_WRITING) { req->rq_transno = lcd->lcd_last_close_transno; req->rq_status = lcd->lcd_last_close_result; - lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); - lustre_msg_set_status(req->rq_repmsg, req->rq_status); } else { req->rq_transno = lcd->lcd_last_transno; req->rq_status = lcd->lcd_last_result; - lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); - lustre_msg_set_status(req->rq_repmsg, req->rq_status); + mdt_vbr_reconstruct(req, lcd); } + if (req->rq_status != 0) + req->rq_transno = 0; + lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); + lustre_msg_set_status(req->rq_repmsg, req->rq_status); + DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"/status %d", + req->rq_transno, req->rq_status); + mdt_steal_ack_locks(req); } diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 4de1f39..26f923d 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -96,6 +96,51 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc, RETURN(rc); } +int mdt_version_get_check(struct mdt_thread_info *info, int index) +{ + /** version recovery */ + struct md_object *mo; + struct ptlrpc_request *req = mdt_info_req(info); + __u64 curr_version, *pre_versions; + ENTRY; + + if (!exp_connect_vbr(req->rq_export)) + RETURN(0); + + LASSERT(info->mti_mos[index]); + LASSERT(mdt_object_exists(info->mti_mos[index])); + mo = mdt_object_child(info->mti_mos[index]); + + curr_version = mo_version_get(info->mti_env, mo); + CDEBUG(D_INODE, "Version is "LPX64"\n", curr_version); + /** VBR: version is checked always because costs nothing */ + if (lustre_msg_get_transno(req->rq_reqmsg) != 0) { + pre_versions = lustre_msg_get_versions(req->rq_reqmsg); + LASSERT(index < PTLRPC_NUM_VERSIONS); + /** Sanity check for malformed buffers */ + if (pre_versions == NULL) { + CERROR("No versions in request buffer\n"); + spin_lock(&req->rq_export->exp_lock); + req->rq_export->exp_vbr_failed = 1; + spin_unlock(&req->rq_export->exp_lock); + RETURN(-EOVERFLOW); + } else if (pre_versions[index] != curr_version) { + CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", + pre_versions[index], curr_version); + spin_lock(&req->rq_export->exp_lock); + req->rq_export->exp_vbr_failed = 1; + spin_unlock(&req->rq_export->exp_lock); + RETURN(-EOVERFLOW); + } + } + /** save pre-versions in reply */ + LASSERT(req->rq_repmsg != NULL); + pre_versions = lustre_msg_get_versions(req->rq_repmsg); + if (pre_versions) + pre_versions[index] = curr_version; + RETURN(0); +} + static int mdt_md_create(struct mdt_thread_info *info) { struct mdt_device *mdt = info->mti_mdt; @@ -136,6 +181,12 @@ static int mdt_md_create(struct mdt_thread_info *info) mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_CREATE_WRITE); + info->mti_mos[0] = parent; + info->mti_mos[1] = child; + rc = mdt_version_get_check(info, 0); + if (rc) + GOTO(out_put_child, rc); + /* Let lower layer know current lock mode. */ info->mti_spec.sp_cr_mode = mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode); @@ -158,6 +209,7 @@ static int mdt_md_create(struct mdt_thread_info *info) mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(child)); } +out_put_child: mdt_object_put(info->mti_env, child); } else rc = PTR_ERR(child); @@ -227,6 +279,7 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags) struct md_attr *ma = &info->mti_attr; struct mdt_lock_handle *lh; int som_update = 0; + int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID); int rc; ENTRY; @@ -271,6 +324,14 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags) if (unlikely(ma->ma_attr.la_valid == LA_CTIME)) ma->ma_attr_flags |= MDS_VTX_BYPASS; + /* VBR: update version if attr changed are important for recovery */ + if (do_vbr) { + info->mti_mos[0] = mo; + rc = mdt_version_get_check(info, 0); + if (rc) + GOTO(out_unlock, rc); + } + /* all attrs are packed into mti_attr in unpack_setattr */ rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma); if (rc != 0) @@ -315,6 +376,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, if (IS_ERR(mo)) GOTO(out, rc = PTR_ERR(mo)); + /* start a log jounal handle if needed */ if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) { if ((ma->ma_attr.la_valid & LA_SIZE) || (rr->rr_flags & MRF_SETATTR_LOCKED)) { @@ -497,6 +559,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, GOTO(out, rc); } + info->mti_mos[0] = mp; + rc = mdt_version_get_check(info, 0); + if (rc) + GOTO(out_unlock_parent, rc); + mdt_reint_init_ma(info, ma); if (!ma->ma_lmm || !ma->ma_cookie) GOTO(out_unlock_parent, rc = -EINVAL); @@ -542,6 +609,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_UNLINK_WRITE); + info->mti_mos[1] = mc; + rc = mdt_version_get_check(info, 1); + if (rc) + GOTO(out_unlock_child, rc); + /* * Now we can only make sure we need MA_INODE, in mdd layer, will check * whether need MA_LOV and MA_COOKIE. @@ -555,6 +627,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, mdt_handle_last_unlink(info, mc, ma); EXIT; +out_unlock_child: mdt_object_unlock_put(info, mc, child_lh, rc); out_unlock_parent: mdt_object_unlock_put(info, mp, parent_lh, rc); @@ -614,6 +687,11 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (IS_ERR(mp)) RETURN(PTR_ERR(mp)); + info->mti_mos[0] = mp; + rc = mdt_version_get_check(info, 0); + if (rc) + GOTO(out_unlock_parent, rc); + /* step 2: find & lock the source */ lhs = &info->mti_lh[MDT_LH_CHILD]; mdt_lock_reg_init(lhs, LCK_EX); @@ -633,11 +711,17 @@ static int mdt_reint_link(struct mdt_thread_info *info, mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_LINK_WRITE); + info->mti_mos[1] = ms; + rc = mdt_version_get_check(info, 1); + if (rc) + GOTO(out_unlock_child, rc); + lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); rc = mdo_link(info->mti_env, mdt_object_child(mp), mdt_object_child(ms), lname, ma); EXIT; +out_unlock_child: mdt_object_unlock_put(info, ms, lhs, rc); out_unlock_parent: mdt_object_unlock_put(info, mp, lhp, rc); @@ -871,6 +955,11 @@ static int mdt_reint_rename(struct mdt_thread_info *info, if (IS_ERR(msrcdir)) GOTO(out_rename_lock, rc = PTR_ERR(msrcdir)); + info->mti_mos[0] = msrcdir; + rc = mdt_version_get_check(info, 0); + if (rc) + GOTO(out_unlock_source, rc); + /* step 2: find & lock the target dir. */ lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt, @@ -892,7 +981,14 @@ static int mdt_reint_rename(struct mdt_thread_info *info, rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp, MDS_INODELOCK_UPDATE, MDT_LOCAL_LOCK); - if (rc != 0) + if (rc != 0) { + mdt_object_put(info->mti_env, mtgtdir); + GOTO(out_unlock_source, rc); + } + + info->mti_mos[1] = mtgtdir; + rc = mdt_version_get_check(info, 1); + if (rc) GOTO(out_unlock_target, rc); } } @@ -920,6 +1016,12 @@ static int mdt_reint_rename(struct mdt_thread_info *info, mdt_object_put(info->mti_env, mold); GOTO(out_unlock_target, rc); } + + info->mti_mos[2] = mold; + rc = mdt_version_get_check(info, 2); + if (rc) + GOTO(out_unlock_old, rc); + mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA); /* step 4: find & lock the new object. */ @@ -947,6 +1049,12 @@ static int mdt_reint_rename(struct mdt_thread_info *info, mdt_object_put(info->mti_env, mnew); GOTO(out_unlock_old, rc); } + + info->mti_mos[3] = mnew; + rc = mdt_version_get_check(info, 3); + if (rc) + GOTO(out_unlock_new, rc); + mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA); } else if (rc != -EREMOTE && rc != -ENOENT) GOTO(out_unlock_old, rc); diff --git a/lustre/mdt/mdt_xattr.c b/lustre/mdt/mdt_xattr.c index 8e646b2..ae04caf 100644 --- a/lustre/mdt/mdt_xattr.c +++ b/lustre/mdt/mdt_xattr.c @@ -62,7 +62,7 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) static const char user_string[] = "user."; int size, rc; ENTRY; - + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK)) RETURN(-ENOMEM); @@ -75,7 +75,7 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR) && !strncmp(xattr_name, user_string, sizeof(user_string) - 1)) RETURN(-EOPNOTSUPP); - + size = mo_xattr_get(info->mti_env, mdt_object_child(info->mti_object), &LU_BUF_NULL, xattr_name); @@ -352,6 +352,11 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, if (IS_ERR(obj)) GOTO(out, rc = PTR_ERR(obj)); + info->mti_mos[0] = obj; + rc = mdt_version_get_check(info, 0); + if (rc) + GOTO(out_unlock, rc); + if (unlikely(!(valid & OBD_MD_FLCTIME))) { CWARN("client miss to set OBD_MD_FLCTIME when " "setxattr: [object "DFID"] [valid %llu]\n", diff --git a/lustre/obdclass/dt_object.c b/lustre/obdclass/dt_object.c index 79c160e..e5d3c4d 100644 --- a/lustre/obdclass/dt_object.c +++ b/lustre/obdclass/dt_object.c @@ -97,7 +97,8 @@ int dt_txn_hook_start(const struct lu_env *env, result = 0; list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) { - if (cb->dtc_txn_start == NULL) + if (cb->dtc_txn_start == NULL || + !(cb->dtc_tag & env->le_ctx.lc_tags)) continue; result = cb->dtc_txn_start(env, param, cb->dtc_cookie); if (result < 0) @@ -115,7 +116,8 @@ int dt_txn_hook_stop(const struct lu_env *env, struct thandle *txn) result = 0; list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) { - if (cb->dtc_txn_stop == NULL) + if (cb->dtc_txn_stop == NULL || + !(cb->dtc_tag & env->le_ctx.lc_tags)) continue; result = cb->dtc_txn_stop(env, txn, cb->dtc_cookie); if (result < 0) @@ -133,7 +135,8 @@ int dt_txn_hook_commit(const struct lu_env *env, struct thandle *txn) result = 0; list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) { - if (cb->dtc_txn_commit == NULL) + if (cb->dtc_txn_commit == NULL || + !(cb->dtc_tag & env->le_ctx.lc_tags)) continue; result = cb->dtc_txn_commit(env, txn, cb->dtc_cookie); if (result < 0) @@ -400,5 +403,38 @@ void dt_global_fini(void) lu_context_key_degister(&dt_key); } +int dt_record_read(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *buf, loff_t *pos) +{ + int rc; + + LASSERTF(dt != NULL, "dt is NULL when we want to read record\n"); + + rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA); + + if (rc == buf->lb_len) + rc = 0; + else if (rc >= 0) + rc = -EFAULT; + return rc; +} +EXPORT_SYMBOL(dt_record_read); + +int dt_record_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, struct thandle *th) +{ + int rc; + + LASSERTF(dt != NULL, "dt is NULL when we want to write record\n"); + LASSERT(th != NULL); + rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1); + if (rc == buf->lb_len) + rc = 0; + else if (rc >= 0) + rc = -EFAULT; + return rc; +} +EXPORT_SYMBOL(dt_record_write); + const struct dt_index_features dt_directory_features; EXPORT_SYMBOL(dt_directory_features); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 615b2f9..b0ba327 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -723,6 +723,7 @@ static void class_export_destroy(struct obd_export *exp) ptlrpc_put_connection_superhack(exp->exp_connection); LASSERT(list_empty(&exp->exp_outstanding_replies)); + LASSERT(list_empty(&exp->exp_uncommitted_replies)); LASSERT(list_empty(&exp->exp_req_replay_queue)); LASSERT(list_empty(&exp->exp_queued_rpc)); obd_destroy_export(exp); @@ -781,6 +782,8 @@ struct obd_export *class_new_export(struct obd_device *obd, atomic_set(&export->exp_rpc_count, 0); export->exp_obd = obd; CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies); + spin_lock_init(&export->exp_uncommitted_replies_lock); + CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies); CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue); CFS_INIT_LIST_HEAD(&export->exp_handle.h_link); CFS_INIT_LIST_HEAD(&export->exp_queued_rpc); @@ -837,6 +840,15 @@ void class_unlink_export(struct obd_export *exp) exp->exp_obd->obd_num_exports--; spin_unlock(&exp->exp_obd->obd_dev_lock); + /* Keep these counter valid always */ + spin_lock_bh(&exp->exp_obd->obd_processing_task_lock); + if (exp->exp_delayed) + exp->exp_obd->obd_delayed_clients--; + else if (exp->exp_in_recovery) + exp->exp_obd->obd_recoverable_clients--; + else if (exp->exp_obd->obd_recovering) + exp->exp_obd->obd_max_recoverable_clients--; + spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock); class_export_put(exp); } EXPORT_SYMBOL(class_unlink_export); @@ -1125,9 +1137,10 @@ void class_disconnect_exports(struct obd_device *obd) ENTRY; /* Move all of the exports from obd_exports to a work list, en masse. */ + CFS_INIT_LIST_HEAD(&work_list); spin_lock(&obd->obd_dev_lock); - list_add(&work_list, &obd->obd_exports); - list_del_init(&obd->obd_exports); + list_splice_init(&obd->obd_exports, &work_list); + list_splice_init(&obd->obd_delayed_exports, &work_list); spin_unlock(&obd->obd_dev_lock); if (!list_empty(&work_list)) { @@ -1161,8 +1174,7 @@ int class_disconnect_stale_exports(struct obd_device *obd, if (test_export(exp)) continue; - list_del(&exp->exp_obd_chain); - list_add(&exp->exp_obd_chain, &work_list); + list_move(&exp->exp_obd_chain, &work_list); /* don't count self-export as client */ if (obd_uuid_equals(&exp->exp_client_uuid, &exp->exp_obd->obd_uuid)) diff --git a/lustre/obdclass/llog_obd.c b/lustre/obdclass/llog_obd.c index 2178f9e..41fb346 100644 --- a/lustre/obdclass/llog_obd.c +++ b/lustre/obdclass/llog_obd.c @@ -91,10 +91,10 @@ int __llog_ctxt_put(struct llog_ctxt *ctxt) } olg->olg_ctxts[ctxt->loc_idx] = NULL; spin_unlock(&olg->olg_lock); - + if (ctxt->loc_lcm) lcm_put(ctxt->loc_lcm); - + obd = ctxt->loc_obd; spin_lock(&obd->obd_dev_lock); spin_unlock(&obd->obd_dev_lock); /* sync with llog ctxt user thread */ diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index 74b94e4..f5d7316 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -247,14 +247,15 @@ int class_attach(struct lustre_cfg *lcfg) LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08X != %08X\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC); - LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0, "%p obd_name %s != %s\n", - obd, obd->obd_name, name); + LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0, + "%p obd_name %s != %s\n", obd, obd->obd_name, name); rwlock_init(&obd->obd_pool_lock); obd->obd_pool_limit = 0; obd->obd_pool_slv = 0; CFS_INIT_LIST_HEAD(&obd->obd_exports); + CFS_INIT_LIST_HEAD(&obd->obd_delayed_exports); CFS_INIT_LIST_HEAD(&obd->obd_exports_timed); CFS_INIT_LIST_HEAD(&obd->obd_nid_stats); spin_lock_init(&obd->obd_nid_lock); @@ -277,9 +278,6 @@ int class_attach(struct lustre_cfg *lcfg) llog_group_init(&obd->obd_olg, FILTER_GROUP_LLOG); - spin_lock_init(&obd->obd_uncommitted_replies_lock); - CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies); - len = strlen(uuid); if (len >= sizeof(obd->obd_uuid)) { CERROR("uuid must be < %d bytes long\n", @@ -499,7 +497,7 @@ int class_cleanup(struct obd_device *obd, struct lustre_cfg *lcfg) /* Leave this on forever */ obd->obd_stopping = 1; spin_unlock(&obd->obd_dev_lock); - + if (lcfg->lcfg_bufcount >= 2 && LUSTRE_CFG_BUFLEN(lcfg, 1) > 0) { for (flag = lustre_cfg_string(lcfg, 1); *flag != 0; flag++) switch (*flag) { @@ -859,7 +857,7 @@ int class_process_config(struct lustre_cfg *lcfg) ldlm_timeout = max(lcfg->lcfg_num, 1U); if (ldlm_timeout >= obd_timeout) ldlm_timeout = max(obd_timeout / 3, 1U); - + GOTO(out, err = 0); } case LCFG_SET_UPCALL: { diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 76e558f..51344ba 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -87,12 +87,38 @@ cfs_mem_cache_t *ll_fmd_cachep; static void filter_commit_cb(struct obd_device *obd, __u64 transno, void *cb_data, int error) { - obd_transno_commit_cb(obd, transno, error); + struct obd_export *exp = cb_data; + obd_transno_commit_cb(obd, transno, exp, error); +} + +int filter_version_get_check(struct obd_export *exp, + struct obd_trans_info *oti, struct inode *inode) +{ + __u64 curr_version; + + if (inode == NULL || oti == NULL) + RETURN(0); + + curr_version = fsfilt_get_version(exp->exp_obd, inode); + if ((__s64)curr_version == -EOPNOTSUPP) + RETURN(0); + /* VBR: version is checked always because costs nothing */ + if (oti->oti_pre_version != 0 && + oti->oti_pre_version != curr_version) { + CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n", + oti->oti_pre_version, curr_version); + spin_lock(&exp->exp_lock); + exp->exp_vbr_failed = 1; + spin_unlock(&exp->exp_lock); + RETURN (-EOVERFLOW); + } + oti->oti_pre_version = curr_version; + RETURN(0); } /* Assumes caller has already pushed us into the kernel context. */ -int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, - int rc, int force_sync) +int filter_finish_transno(struct obd_export *exp, struct inode *inode, + struct obd_trans_info *oti, int rc, int force_sync) { struct filter_obd *filter = &exp->exp_obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; @@ -109,24 +135,28 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, RETURN(rc); /* we don't allocate new transnos for replayed requests */ + spin_lock(&filter->fo_translock); if (oti->oti_transno == 0) { - spin_lock(&filter->fo_translock); last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1; filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); - spin_unlock(&filter->fo_translock); - oti->oti_transno = last_rcvd; } else { - spin_lock(&filter->fo_translock); last_rcvd = oti->oti_transno; if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno)) filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); + } + oti->oti_transno = last_rcvd; + if (last_rcvd <= le64_to_cpu(lcd->lcd_last_transno)) { spin_unlock(&filter->fo_translock); + LBUG(); } lcd->lcd_last_transno = cpu_to_le64(last_rcvd); + lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version); + lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid); + spin_unlock(&filter->fo_translock); - /* could get xid from oti, if it's ever needed */ - lcd->lcd_last_xid = 0; + if (inode) + fsfilt_set_version(exp->exp_obd, inode, last_rcvd); off = fed->fed_lr_off; if (off <= 0) { @@ -139,13 +169,13 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, last_rcvd, oti->oti_handle, filter_commit_cb, - NULL); + exp); err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, lcd, sizeof(*lcd), &off, force_sync | exp->exp_need_sync); if (force_sync) - filter_commit_cb(exp->exp_obd, last_rcvd, NULL, err); + filter_commit_cb(exp->exp_obd, last_rcvd, exp, err); } if (err) { log_pri = D_ERROR; @@ -328,6 +358,9 @@ static int filter_client_add(struct obd_device *obd, struct obd_export *exp, rc = PTR_ERR(handle); CERROR("unable to start transaction: rc %d\n", rc); } else { + fed->fed_lcd->lcd_last_epoch = + filter->fo_fsd->lsd_start_epoch; + exp->exp_last_request_time = cfs_time_current_sec(); rc = fsfilt_add_journal_cb(obd, 0, handle, target_client_add_cb, exp); if (rc == 0) { @@ -618,7 +651,7 @@ static int filter_init_export(struct obd_export *exp) static int filter_free_server_data(struct filter_obd *filter) { - OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd)); + OBD_FREE_PTR(filter->fo_fsd); filter->fo_fsd = NULL; OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8); filter->fo_last_rcvd_slots = NULL; @@ -639,7 +672,6 @@ int filter_update_server_data(struct obd_device *obd, struct file *filp, CDEBUG(D_INODE, "server last_mount: "LPU64"\n", le64_to_cpu(fsd->lsd_mount_count)); - fsd->lsd_compat14 = fsd->lsd_last_transno; rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off, force_sync); if (rc) CERROR("error writing lr_server_data: rc = %d\n", rc); @@ -683,6 +715,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) struct inode *inode = filp->f_dentry->d_inode; unsigned long last_rcvd_size = i_size_read(inode); __u64 mount_count; + __u32 start_epoch; int cl_idx; loff_t off = 0; int rc; @@ -754,7 +787,11 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) GOTO(err_fsd, rc = -EINVAL); } - CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n", + start_epoch = le32_to_cpu(fsd->lsd_start_epoch); + + CDEBUG(D_INODE, "%s: server start_epoch : %#x\n", + obd->obd_name, start_epoch); + CDEBUG(D_INODE, "%s: server last_transno : "LPX64"\n", obd->obd_name, le64_to_cpu(fsd->lsd_last_transno)); CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n", obd->obd_name, mount_count + 1); @@ -834,12 +871,16 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) /* can't fail for existing client */ LASSERTF(rc == 0, "rc = %d\n", rc); - lcd = NULL; + /* VBR: set export last committed */ + exp->exp_last_committed = last_rcvd; spin_lock(&exp->exp_lock); exp->exp_connecting = 0; exp->exp_in_recovery = 0; spin_unlock(&exp->exp_lock); + spin_lock_bh(&obd->obd_processing_task_lock); obd->obd_max_recoverable_clients++; + spin_unlock_bh(&obd->obd_processing_task_lock); + lcd = NULL; class_export_put(exp); } @@ -856,7 +897,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp) obd->obd_last_committed = le64_to_cpu(fsd->lsd_last_transno); - target_recovery_init(obd, ost_handle); + target_recovery_init(&filter->fo_lut, ost_handle); out: filter->fo_mount_count = mount_count + 1; @@ -1308,6 +1349,9 @@ static int filter_prep(struct obd_device *obd) GOTO(err_filp, rc = -EOPNOTSUPP); } + /** lu_target has very limited use in filter now */ + lut_init(NULL, &filter->fo_lut, obd, NULL); + rc = filter_init_server_data(obd, file); if (rc) { CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc); @@ -2069,10 +2113,11 @@ int filter_common_setup(struct obd_device *obd, struct lustre_cfg* lcfg, spin_lock_init(&filter->fo_llog_list_lock); filter->fo_fl_oss_capa = 1; + CFS_INIT_LIST_HEAD(&filter->fo_capa_keys); filter->fo_capa_hash = init_capa_hash(); if (filter->fo_capa_hash == NULL) - GOTO(err_ops, rc = -ENOMEM); + GOTO(err_post, rc = -ENOMEM); sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid); obd->obd_namespace = ldlm_namespace_new(obd, ns_name, LDLM_NAMESPACE_SERVER, @@ -2584,6 +2629,8 @@ static int filter_precleanup(struct obd_device *obd, case OBD_CLEANUP_EARLY: break; case OBD_CLEANUP_EXPORTS: + /* Stop recovery before namespace cleanup. */ + target_stop_recovery_thread(obd); target_cleanup_recovery(obd); rc = filter_llog_preclean(obd); break; @@ -2615,10 +2662,6 @@ static int filter_cleanup(struct obd_device *obd) lprocfs_obd_cleanup(obd); lquota_cleanup(filter_quota_interface_ref, obd); - /* Stop recovery before namespace cleanup. */ - target_stop_recovery_thread(obd); - target_cleanup_recovery(obd); - ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force); obd->obd_namespace = NULL; @@ -3106,7 +3149,6 @@ static void filter_revimp_update(struct obd_export *exp) static int filter_ping(struct obd_export *exp) { filter_fmd_expire(exp); - return 0; } @@ -3246,6 +3288,11 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, old_size = i_size_read(inode); } + /* VBR: version recovery check */ + rc = filter_version_get_check(exp, oti, inode); + if (rc) + GOTO(out_unlock, rc); + /* If the inode still has SUID+SGID bits set (see filter_precreate()) * then we will accept the UID+GID sent by the client during write for * initializing the ownership of this inode. We only allow this to @@ -3308,7 +3355,7 @@ int filter_setattr_internal(struct obd_export *exp, struct dentry *dentry, * sure we have one left for the last_rcvd update. */ err = fsfilt_extend(exp->exp_obd, inode, 1, handle); - rc = filter_finish_transno(exp, oti, rc, sync); + rc = filter_finish_transno(exp, inode, oti, rc, sync); if (sync) { filter_cancel_cookies_cb(exp->exp_obd, 0, fcc, rc); fcc = NULL; @@ -4047,6 +4094,12 @@ int filter_destroy(struct obd_export *exp, struct obdo *oa, * (see BUG 4180) -bzzz */ LOCK_INODE_MUTEX(dchild->d_inode); + + /* VBR: version recovery check */ + rc = filter_version_get_check(exp, oti, dchild->d_inode); + if (rc) + GOTO(cleanup, rc); + handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR, NULL, 1); if (IS_ERR(handle)) { @@ -4103,7 +4156,7 @@ cleanup: * on commit. then we call callback directly to free * the fcc. */ - rc = filter_finish_transno(exp, oti, rc, sync); + rc = filter_finish_transno(exp, NULL, oti, rc, sync); if (sync) { filter_cancel_cookies_cb(obd, 0, fcc, rc); fcc = NULL; diff --git a/lustre/obdfilter/filter_internal.h b/lustre/obdfilter/filter_internal.h index 9cb6de0..dbe4fae 100644 --- a/lustre/obdfilter/filter_internal.h +++ b/lustre/obdfilter/filter_internal.h @@ -40,7 +40,6 @@ #ifdef __KERNEL__ # include #endif -#include #include #include #include @@ -127,7 +126,7 @@ enum { extern int *obdfilter_created_scratchpad; extern void target_recovery_fini(struct obd_device *obd); -extern void target_recovery_init(struct obd_device *obd, +extern void target_recovery_init(struct lu_target *lut, svc_handler_t handler); /* filter.c */ @@ -138,8 +137,8 @@ struct dentry *__filter_oa2dentry(struct obd_device *obd, struct obdo *oa, const char *what, int quiet); #define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 0) -int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc, - int force_sync); +int filter_finish_transno(struct obd_export *, struct inode *, + struct obd_trans_info *, int rc, int force_sync); __u64 filter_next_id(struct filter_obd *, struct obdo *); __u64 filter_last_id(struct filter_obd *, obd_gr group); int filter_update_fidea(struct obd_export *exp, struct inode *inode, diff --git a/lustre/obdfilter/filter_io_26.c b/lustre/obdfilter/filter_io_26.c index 95835a3..3bd68f6 100644 --- a/lustre/obdfilter/filter_io_26.c +++ b/lustre/obdfilter/filter_io_26.c @@ -500,7 +500,7 @@ int filter_direct_io(int rw, struct dentry *dchild, struct filter_iobuf *iobuf, UNLOCK_INODE_MUTEX(inode); - rc2 = filter_finish_transno(exp, oti, 0, 0); + rc2 = filter_finish_transno(exp, inode, oti, 0, 0); if (rc2 != 0) { CERROR("can't close transaction: %d\n", rc2); if (rc == 0) diff --git a/lustre/osd/osd_handler.c b/lustre/osd/osd_handler.c index a6c7e98..b8e1a2b 100644 --- a/lustre/osd/osd_handler.c +++ b/lustre/osd/osd_handler.c @@ -2078,6 +2078,35 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt) RETURN(rc); } +/* + * Get the 64-bit version for an inode. + */ +static dt_obj_version_t osd_object_version_get(const struct lu_env *env, + struct dt_object *dt) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n", + LDISKFS_I(inode)->i_fs_version, inode->i_ino); + return LDISKFS_I(inode)->i_fs_version; +} + +/* + * Set the 64-bit version and return the old version. + */ +static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt, + dt_obj_version_t new_version) +{ + struct inode *inode = osd_dt_obj(dt)->oo_inode; + + CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n", + new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino); + LDISKFS_I(inode)->i_fs_version = new_version; + /** Version is set after all inode operations are finished, + * so we should mark it dirty here */ + inode->i_sb->s_op->dirty_inode(inode); +} + static int osd_data_get(const struct lu_env *env, struct dt_object *dt, void **data) { @@ -2106,6 +2135,8 @@ static const struct dt_object_operations osd_obj_ops = { .do_xattr_list = osd_xattr_list, .do_capa_get = osd_capa_get, .do_object_sync = osd_object_sync, + .do_version_get = osd_object_version_get, + .do_version_set = osd_object_version_set, .do_data_get = osd_data_get, }; @@ -2131,6 +2162,8 @@ static const struct dt_object_operations osd_obj_ea_ops = { .do_xattr_list = osd_xattr_list, .do_capa_get = osd_capa_get, .do_object_sync = osd_object_sync, + .do_version_get = osd_object_version_get, + .do_version_set = osd_object_version_set, .do_data_get = osd_data_get, }; diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 3660c7f..a0ccb07 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -13,7 +13,7 @@ ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o -ptlrpc_objs += sec_null.o sec_plain.o +ptlrpc_objs += sec_null.o sec_plain.o target.o ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs) diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index d2a5cf9..375fb72 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -940,6 +940,27 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) } /** + * save pre-versions for replay + */ +static void ptlrpc_save_versions(struct ptlrpc_request *req) +{ + struct lustre_msg *repmsg = req->rq_repmsg; + struct lustre_msg *reqmsg = req->rq_reqmsg; + __u64 *versions = lustre_msg_get_versions(repmsg); + ENTRY; + + if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) + return; + + LASSERT(versions); + lustre_msg_set_versions(reqmsg, versions); + CDEBUG(D_INFO, "Client save versions ["LPX64"/"LPX64"]\n", + versions[0], versions[1]); + + EXIT; +} + +/** * Callback function called when client receives RPC reply for \a req. */ static int after_reply(struct ptlrpc_request *req) @@ -1027,7 +1048,7 @@ static int after_reply(struct ptlrpc_request *req) lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno); } - if (req->rq_import->imp_replayable) { + if (imp->imp_replayable) { spin_lock(&imp->imp_lock); /* * No point in adding already-committed requests to the replay @@ -1036,9 +1057,11 @@ static int after_reply(struct ptlrpc_request *req) if (req->rq_transno != 0 && (req->rq_transno > lustre_msg_get_last_committed(req->rq_repmsg) || - req->rq_replay)) + req->rq_replay)) { + /** version recovery */ + ptlrpc_save_versions(req); ptlrpc_retain_replayable_request(req, imp); - else if (req->rq_commit_cb != NULL) { + } else if (req->rq_commit_cb != NULL) { spin_unlock(&imp->imp_lock); req->rq_commit_cb(req); spin_lock(&imp->imp_lock); @@ -2328,13 +2351,31 @@ static int ptlrpc_replay_interpret(const struct lu_env *env, lustre_msg_get_status(req->rq_repmsg) == -ENODEV)) GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg)); - /* The transno had better not change over replay. */ - LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) == - lustre_msg_get_transno(req->rq_repmsg) || - lustre_msg_get_transno(req->rq_repmsg) == 0, - LPX64"/"LPX64"\n", - lustre_msg_get_transno(req->rq_reqmsg), - lustre_msg_get_transno(req->rq_repmsg)); + /** VBR: check version failure */ + if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) { + /** replay was failed due to version mismatch */ + DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n"); + spin_lock(&imp->imp_lock); + imp->imp_vbr_failed = 1; + imp->imp_no_lock_replay = 1; + spin_unlock(&imp->imp_lock); + } else { + /** The transno had better not change over replay. */ + LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) == + lustre_msg_get_transno(req->rq_repmsg) || + lustre_msg_get_transno(req->rq_repmsg) == 0, + LPX64"/"LPX64"\n", + lustre_msg_get_transno(req->rq_reqmsg), + lustre_msg_get_transno(req->rq_repmsg)); + } + + spin_lock(&imp->imp_lock); + /** if replays by version then gap was occur on server, no trust to locks */ + if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY) + imp->imp_no_lock_replay = 1; + imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg); + spin_unlock(&imp->imp_lock); + LASSERT(imp->imp_last_replay_transno); DEBUG_REQ(D_HA, req, "got rep"); diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index eef7b7e..baa4fb8 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -1149,12 +1149,23 @@ static int completed_replay_interpret(const struct lu_env *env, { ENTRY; atomic_dec(&req->rq_import->imp_replay_inflight); - if (req->rq_status == 0) { + if (req->rq_status == 0 && + !req->rq_import->imp_vbr_failed) { ptlrpc_import_recovery_state_machine(req->rq_import); } else { - CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, " - "reconnecting\n", - req->rq_import->imp_obd->obd_name, req->rq_status); + if (req->rq_import->imp_vbr_failed) { + CDEBUG(D_WARNING, + "%s: version recovery fails, reconnecting\n", + req->rq_import->imp_obd->obd_name); + spin_lock(&req->rq_import->imp_lock); + req->rq_import->imp_vbr_failed = 0; + spin_unlock(&req->rq_import->imp_lock); + } else { + CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, " + "reconnecting\n", + req->rq_import->imp_obd->obd_name, + req->rq_status); + } ptlrpc_connect_import(req->rq_import, NULL); } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 93168eb..51e109b 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -774,6 +774,12 @@ void *lustre_swab_repbuf(struct ptlrpc_request *req, int index, int min_size, return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber); } +static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg) +{ + return lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, + sizeof(struct ptlrpc_body)); +} + __u32 lustre_msghdr_get_flags(struct lustre_msg *msg) { switch (msg->lm_magic) { @@ -809,9 +815,7 @@ __u32 lustre_msg_get_flags(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -829,9 +833,7 @@ void lustre_msg_add_flags(struct lustre_msg *msg, int flags) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_flags |= flags; return; @@ -845,9 +847,7 @@ void lustre_msg_set_flags(struct lustre_msg *msg, int flags) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_flags = flags; return; @@ -862,9 +862,7 @@ void lustre_msg_clear_flags(struct lustre_msg *msg, int flags) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_flags &= ~(MSG_GEN_FLAG_MASK & flags); return; @@ -879,9 +877,7 @@ __u32 lustre_msg_get_op_flags(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -897,9 +893,7 @@ void lustre_msg_add_op_flags(struct lustre_msg *msg, int flags) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_op_flags |= flags; return; @@ -913,9 +907,7 @@ void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_op_flags |= flags; return; @@ -930,9 +922,7 @@ struct lustre_handle *lustre_msg_get_handle(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return NULL; @@ -950,9 +940,7 @@ __u32 lustre_msg_get_type(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return PTL_RPC_MSG_ERR; @@ -970,9 +958,7 @@ __u32 lustre_msg_get_version(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -990,9 +976,7 @@ void lustre_msg_add_version(struct lustre_msg *msg, int version) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_version |= version; return; @@ -1007,9 +991,7 @@ __u32 lustre_msg_get_opc(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1027,9 +1009,7 @@ __u64 lustre_msg_get_last_xid(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1047,9 +1027,7 @@ __u64 lustre_msg_get_last_committed(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1062,14 +1040,31 @@ __u64 lustre_msg_get_last_committed(struct lustre_msg *msg) } } +__u64 *lustre_msg_get_versions(struct lustre_msg *msg) +{ + switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V1: + return NULL; + case LUSTRE_MSG_MAGIC_V2: { + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { + CERROR("invalid msg %p: no ptlrpc body!\n", msg); + return NULL; + } + return pb->pb_pre_versions; + } + default: + CERROR("incorrect message magic: %08x\n", msg->lm_magic); + return NULL; + } +} + __u64 lustre_msg_get_transno(struct lustre_msg *msg) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1087,9 +1082,7 @@ int lustre_msg_get_status(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return -EINVAL; @@ -1108,9 +1101,7 @@ __u64 lustre_msg_get_slv(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return -EINVAL; @@ -1129,9 +1120,7 @@ void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return; @@ -1150,9 +1139,7 @@ __u32 lustre_msg_get_limit(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return -EINVAL; @@ -1171,9 +1158,7 @@ void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return; @@ -1192,9 +1177,7 @@ __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1238,9 +1221,7 @@ __u32 lustre_msg_get_timeout(struct lustre_msg *msg) return 0; case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1262,9 +1243,7 @@ __u32 lustre_msg_get_service_time(struct lustre_msg *msg) return 0; case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1301,8 +1280,7 @@ __u32 lustre_msg_calc_cksum(struct lustre_msg *msg) return 0; case LUSTRE_MSG_MAGIC_V2: case LUSTRE_MSG_MAGIC_V2_SWABBED: { - struct ptlrpc_body *pb; - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); return crc32_le(~(__u32)0, (unsigned char *)pb, sizeof(*pb)); } @@ -1316,9 +1294,7 @@ void lustre_msg_set_handle(struct lustre_msg *msg, struct lustre_handle *handle) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_handle = *handle; return; @@ -1332,9 +1308,7 @@ void lustre_msg_set_type(struct lustre_msg *msg, __u32 type) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_type = type; return; @@ -1348,9 +1322,7 @@ void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_opc = opc; return; @@ -1364,9 +1336,7 @@ void lustre_msg_set_last_xid(struct lustre_msg *msg, __u64 last_xid) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_last_xid = last_xid; return; @@ -1380,9 +1350,7 @@ void lustre_msg_set_last_committed(struct lustre_msg *msg, __u64 last_committed) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_last_committed = last_committed; return; @@ -1392,13 +1360,30 @@ void lustre_msg_set_last_committed(struct lustre_msg *msg, __u64 last_committed) } } -void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno) +void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions) { switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V1: + return; case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); + pb->pb_pre_versions[0] = versions[0]; + pb->pb_pre_versions[1] = versions[1]; + pb->pb_pre_versions[2] = versions[2]; + pb->pb_pre_versions[3] = versions[3]; + return; + } + default: + LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic); + } +} - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); +void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno) +{ + switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V2: { + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_transno = transno; return; @@ -1412,9 +1397,7 @@ void lustre_msg_set_status(struct lustre_msg *msg, __u32 status) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_status = status; return; @@ -1428,9 +1411,7 @@ void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_conn_cnt = conn_cnt; return; @@ -1446,9 +1427,7 @@ void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout) case LUSTRE_MSG_MAGIC_V1: return; case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_timeout = timeout; return; @@ -1464,9 +1443,7 @@ void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time) case LUSTRE_MSG_MAGIC_V1: return; case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb; - - pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_service_time = service_time; return; diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index daec0f5..dfcc33c 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -124,6 +124,12 @@ void ptlrpc_ping_import_soon(struct obd_import *imp) imp->imp_next_ping = cfs_time_current(); } +static inline int imp_is_deactive(struct obd_import *imp) +{ + return (imp->imp_deactive || + OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_IMP_DEACTIVE)); +} + static inline int ptlrpc_next_reconnect(struct obd_import *imp) { if (imp->imp_server_timeout) @@ -237,13 +243,13 @@ static void ptlrpc_pinger_process_import(struct obd_import *imp, this_ping) && force == 0) return; - if (level == LUSTRE_IMP_DISCON && !imp->imp_deactive) { + if (level == LUSTRE_IMP_DISCON && !imp_is_deactive(imp)) { /* wait at least a timeout before trying recovery again */ imp->imp_next_ping = ptlrpc_next_reconnect(imp); ptlrpc_initiate_recovery(imp); } else if (level != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov || - imp->imp_deactive) { + imp_is_deactive(imp)) { CDEBUG(D_HA, "not pinging %s (in recovery " " or recovery disabled: %s)\n", obd2cli_tgt(imp->imp_obd), @@ -939,11 +945,13 @@ void ptlrpc_pinger_wake_up() CDEBUG(D_RPCTRACE, "checking import %s->%s\n", imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd)); #ifdef ENABLE_LIBLUSTRE_RECOVERY - if (imp->imp_state == LUSTRE_IMP_DISCON && !imp->imp_deactive) + if (imp->imp_state == LUSTRE_IMP_DISCON && + !imp_is_deactive(imp)) #else /*XXX only recover for the initial connection */ if (!lustre_handle_is_used(&imp->imp_remote_handle) && - imp->imp_state == LUSTRE_IMP_DISCON && !imp->imp_deactive) + imp->imp_state == LUSTRE_IMP_DISCON && + !imp_is_deactive(imp)) #endif ptlrpc_initiate_recovery(imp); else if (imp->imp_state != LUSTRE_IMP_FULL) @@ -951,7 +959,7 @@ void ptlrpc_pinger_wake_up() "state %d, deactive %d\n", imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd), imp->imp_state, - imp->imp_deactive); + imp_is_deactive(imp)); } EXIT; #endif diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index 681873d..7a9e4d5 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -288,6 +288,7 @@ EXPORT_SYMBOL(lustre_msg_add_version); EXPORT_SYMBOL(lustre_msg_get_opc); EXPORT_SYMBOL(lustre_msg_get_last_xid); EXPORT_SYMBOL(lustre_msg_get_last_committed); +EXPORT_SYMBOL(lustre_msg_get_versions); EXPORT_SYMBOL(lustre_msg_get_transno); EXPORT_SYMBOL(lustre_msg_get_status); EXPORT_SYMBOL(lustre_msg_get_slv); @@ -302,6 +303,7 @@ EXPORT_SYMBOL(lustre_msg_set_type); EXPORT_SYMBOL(lustre_msg_set_opc); EXPORT_SYMBOL(lustre_msg_set_last_xid); EXPORT_SYMBOL(lustre_msg_set_last_committed); +EXPORT_SYMBOL(lustre_msg_set_versions); EXPORT_SYMBOL(lustre_msg_set_transno); EXPORT_SYMBOL(lustre_msg_set_status); EXPORT_SYMBOL(lustre_msg_set_conn_cnt); diff --git a/lustre/ptlrpc/recov_thread.c b/lustre/ptlrpc/recov_thread.c index ef02ea7..3dad818 100644 --- a/lustre/ptlrpc/recov_thread.c +++ b/lustre/ptlrpc/recov_thread.c @@ -336,7 +336,7 @@ static struct llog_canceld_ctxt *llcd_detach(struct llog_ctxt *ctxt) static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt) { struct llog_canceld_ctxt *llcd; - + LASSERT(ctxt); llcd = llcd_alloc(ctxt->loc_lcm); if (!llcd) { CERROR("Can't alloc an llcd for ctxt %p\n", ctxt); @@ -597,7 +597,7 @@ int llog_obd_repl_cancel(struct llog_ctxt *ctxt, } lcm = ctxt->loc_lcm; CDEBUG(D_INFO, "cancel on lsm %p\n", lcm); - + /* * Let's check if we have all structures alive. We also check for * possible shutdown. Do nothing if we're stopping. diff --git a/lustre/ptlrpc/recover.c b/lustre/ptlrpc/recover.c index 4adf785..7f27502 100644 --- a/lustre/ptlrpc/recover.c +++ b/lustre/ptlrpc/recover.c @@ -116,17 +116,12 @@ int ptlrpc_replay_next(struct obd_import *imp, int *inflight) req and send it again. If, however, the last sent transno has been committed then we continue replay from the next request. */ - if (imp->imp_resend_replay && - req->rq_transno == last_transno) { - lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT); - break; - } - if (req->rq_transno > last_transno) { - imp->imp_last_replay_transno = req->rq_transno; + if (imp->imp_resend_replay) + lustre_msg_add_flags(req->rq_reqmsg, + MSG_RESENT); break; } - req = NULL; } diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 99f79a4..37dcbde 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -354,11 +354,41 @@ ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs) EXIT; } -void -ptlrpc_commit_replies (struct obd_device *obd) +void ptlrpc_commit_replies_alt(struct obd_export *exp) { - struct list_head *tmp; - struct list_head *nxt; + struct ptlrpc_reply_state *rs, *nxt; + struct list_head committed_list; + DECLARE_RS_BATCH(batch); + ENTRY; + + CFS_INIT_LIST_HEAD(&committed_list); + spin_lock(&exp->exp_uncommitted_replies_lock); + list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies, + rs_obd_list) { + LASSERT (rs->rs_difficult); + LASSERT(rs->rs_export); + if (likely(rs->rs_transno <= exp->exp_last_committed)) + list_move(&rs->rs_obd_list, &committed_list); + else + break; + } + spin_unlock(&exp->exp_uncommitted_replies_lock); + + /* XXX: do we need this in context of commit callback? maybe separate thread + * should work this out */ + rs_batch_init(&batch); + /* get replies that have been committed and get their service + * to attend to complete them. */ + list_for_each_entry_safe(rs, nxt, &committed_list, rs_obd_list) { + list_del_init(&rs->rs_obd_list); + rs_batch_add(&batch, rs); + } + rs_batch_fini(&batch); + EXIT; +} +void ptlrpc_commit_replies(struct obd_export *exp) +{ + struct ptlrpc_reply_state *rs, *nxt; DECLARE_RS_BATCH(batch); ENTRY; @@ -367,19 +397,18 @@ ptlrpc_commit_replies (struct obd_device *obd) * to attend to complete them. */ /* CAVEAT EMPTOR: spinlock ordering!!! */ - spin_lock(&obd->obd_uncommitted_replies_lock); - list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) { - struct ptlrpc_reply_state *rs = - list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list); - + spin_lock(&exp->exp_uncommitted_replies_lock); + list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies, + rs_obd_list) { LASSERT (rs->rs_difficult); - - if (rs->rs_transno <= obd->obd_last_committed) { + /* VBR: per-export last_committed */ + LASSERT(rs->rs_export); + if (rs->rs_transno <= exp->exp_last_committed) { list_del_init(&rs->rs_obd_list); rs_batch_add(&batch, rs); } } - spin_unlock(&obd->obd_uncommitted_replies_lock); + spin_unlock(&exp->exp_uncommitted_replies_lock); rs_batch_fini(&batch); EXIT; } @@ -532,14 +561,14 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size, array->paa_count = 0; array->paa_deadline = -1; - /* allocate memory for srv_at_array (ptlrpc_at_array) */ + /* allocate memory for srv_at_array (ptlrpc_at_array) */ OBD_ALLOC(array->paa_reqs_array, sizeof(struct list_head) * size); if (array->paa_reqs_array == NULL) GOTO(failed, NULL); for (index = 0; index < size; index++) CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]); - + OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size); if (array->paa_reqs_count == NULL) GOTO(failed, NULL); @@ -706,8 +735,8 @@ static void ptlrpc_server_finish_request(struct ptlrpc_request *req) if (req->rq_at_linked) { struct ptlrpc_at_array *array = &svc->srv_at_array; __u32 index = req->rq_at_index; - - req->rq_at_linked = 0; + + req->rq_at_linked = 0; array->paa_reqs_count[index]--; array->paa_count--; } @@ -1096,7 +1125,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service *svc) rq->rq_at_linked = 0; continue; } - + /* update the earliest deadline */ if (deadline == -1 || rq->rq_deadline < deadline) deadline = rq->rq_deadline; @@ -1674,12 +1703,12 @@ ptlrpc_handle_rs (struct ptlrpc_reply_state *rs) list_del_init (&rs->rs_exp_list); spin_unlock (&exp->exp_lock); - /* Avoid obd_uncommitted_replies_lock contention if we 100% sure that + /* Avoid exp_uncommitted_replies_lock contention if we 100% sure that * rs has been removed from the list already */ if (!list_empty_careful(&rs->rs_obd_list)) { - spin_lock(&obd->obd_uncommitted_replies_lock); + spin_lock(&exp->exp_uncommitted_replies_lock); list_del_init(&rs->rs_obd_list); - spin_unlock(&obd->obd_uncommitted_replies_lock); + spin_unlock(&exp->exp_uncommitted_replies_lock); } spin_lock(&rs->rs_lock); @@ -2482,17 +2511,17 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) cfs_timer_disarm(&service->srv_at_timer); if (array->paa_reqs_array != NULL) { - OBD_FREE(array->paa_reqs_array, + OBD_FREE(array->paa_reqs_array, sizeof(struct list_head) * array->paa_size); array->paa_reqs_array = NULL; } - + if (array->paa_reqs_count != NULL) { - OBD_FREE(array->paa_reqs_count, + OBD_FREE(array->paa_reqs_count, sizeof(__u32) * array->paa_size); array->paa_reqs_count= NULL; } - + OBD_FREE_PTR(service); RETURN(0); } diff --git a/lustre/ptlrpc/target.c b/lustre/ptlrpc/target.c new file mode 100644 index 0000000..0be3950 --- /dev/null +++ b/lustre/ptlrpc/target.c @@ -0,0 +1,365 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * Lustre Common Target + * These are common function for MDT and OST recovery-related functionality + * + * Author: Mikhail Pershin + */ + +#include +#include +/** + * Update client data in last_rcvd file. An obd API + */ +static int obt_client_data_update(struct obd_export *exp) +{ + struct lu_export_data *led = &exp->exp_target_data; + struct obd_device_target *obt = &exp->exp_obd->u.obt; + loff_t off = led->led_lr_off; + int rc = 0; + + rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp, + led->led_lcd, sizeof(*led->led_lcd), &off, 0); + + CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n", + led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch), + le32_to_cpu(obt->obt_lsd->lsd_start_epoch)); + + return rc; +} + +/** + * Update server data in last_rcvd file. An obd API + */ +int obt_server_data_update(struct obd_device *obd, int force_sync) +{ + struct obd_device_target *obt = &obd->u.obt; + loff_t off = 0; + int rc; + ENTRY; + + CDEBUG(D_SUPER, + "%s: mount_count is "LPU64", last_transno is "LPU64"\n", + obt->obt_lsd->lsd_uuid, + le64_to_cpu(obt->obt_lsd->lsd_mount_count), + le64_to_cpu(obt->obt_lsd->lsd_last_transno)); + + rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd, + sizeof(*obt->obt_lsd), &off, force_sync); + if (rc) + CERROR("error writing lr_server_data: rc = %d\n", rc); + + RETURN(rc); +} + +/** + * Update client epoch with server's one + */ +void obt_client_epoch_update(struct obd_export *exp) +{ + struct lsd_client_data *lcd = exp->exp_target_data.led_lcd; + struct obd_device_target *obt = &exp->exp_obd->u.obt; + + /** VBR: set client last_epoch to current epoch */ + if (le32_to_cpu(lcd->lcd_last_epoch) >= + le32_to_cpu(obt->obt_lsd->lsd_start_epoch)) + return; + lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch; + obt_client_data_update(exp); +} + +/** + * Increment server epoch. An obd API + */ +static void obt_boot_epoch_update(struct obd_device *obd) +{ + __u32 start_epoch; + struct obd_device_target *obt = &obd->u.obt; + struct ptlrpc_request *req; + struct list_head client_list; + + spin_lock(&obt->obt_translock); + start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1; + obt->obt_last_transno = cpu_to_le64((__u64)start_epoch << + LR_EPOCH_BITS); + obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch); + spin_unlock(&obt->obt_translock); + + CFS_INIT_LIST_HEAD(&client_list); + spin_lock_bh(&obd->obd_processing_task_lock); + list_splice_init(&obd->obd_final_req_queue, &client_list); + spin_unlock_bh(&obd->obd_processing_task_lock); + + /** + * go through list of exports participated in recovery and + * set new epoch for them + */ + list_for_each_entry(req, &client_list, rq_list) { + LASSERT(!req->rq_export->exp_delayed); + obt_client_epoch_update(req->rq_export); + } + /** return list back at once */ + spin_lock_bh(&obd->obd_processing_task_lock); + list_splice_init(&client_list, &obd->obd_final_req_queue); + spin_unlock_bh(&obd->obd_processing_task_lock); + obt_server_data_update(obd, 1); +} + +/** + * write data in last_rcvd file. + */ +static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut, + const struct lu_buf *buf, loff_t *off, int sync) +{ + struct thandle *th; + struct txn_param p; + int rc, credits; + ENTRY; + + credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom, + DTO_WRITE_BLOCK); + txn_param_init(&p, credits); + + th = dt_trans_start(env, lut->lut_bottom, &p); + if (IS_ERR(th)) + RETURN(PTR_ERR(th)); + + rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th); + dt_trans_stop(env, lut->lut_bottom, th); + + CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n" + "uuid = %s\nlast_transno = "LPU64"\n", + rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno); + + RETURN(rc); +} + +/** + * Update client data in last_rcvd + */ +int lut_client_data_update(const struct lu_env *env, struct lu_target *lut, + struct obd_export *exp) +{ + struct lu_export_data *led = &exp->exp_target_data; + struct lsd_client_data tmp_lcd; + loff_t tmp_off = led->led_lr_off; + struct lu_buf tmp_buf = { + .lb_buf = &tmp_lcd, + .lb_len = sizeof(tmp_lcd) + }; + int rc = 0; + + lcd_cpu_to_le(led->led_lcd, &tmp_lcd); + LASSERT(lut->lut_last_rcvd); + rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0); + + return rc; +} + +/** + * Update server data in last_rcvd + */ +static int lut_server_data_update(const struct lu_env *env, + struct lu_target *lut, int sync) +{ + struct lr_server_data tmp_lsd; + loff_t tmp_off = 0; + struct lu_buf tmp_buf = { + .lb_buf = &tmp_lsd, + .lb_len = sizeof(tmp_lsd) + }; + int rc = 0; + ENTRY; + + CDEBUG(D_SUPER, + "%s: mount_count is "LPU64", last_transno is "LPU64"\n", + lut->lut_lsd.lsd_uuid, lut->lut_mount_count, + lut->lut_last_transno); + + spin_lock(&lut->lut_translock); + lut->lut_lsd.lsd_last_transno = lut->lut_last_transno; + spin_unlock(&lut->lut_translock); + + lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd); + if (lut->lut_last_rcvd != NULL) + rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync); + RETURN(rc); +} + +void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut, + struct obd_export *exp) +{ + struct lsd_client_data *lcd = exp->exp_target_data.led_lcd; + + LASSERT(lut->lut_bottom); + /** VBR: set client last_epoch to current epoch */ + if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch) + return; + lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch; + lut_client_data_update(env, lut, exp); +} + +/** + * Update boot epoch when recovery ends + */ +void lut_boot_epoch_update(struct lu_target *lut) +{ + struct lu_env env; + struct ptlrpc_request *req; + __u32 start_epoch; + struct list_head client_list; + int rc; + + if (lut->lut_obd->obd_stopping) + return; + /** Increase server epoch after recovery */ + if (lut->lut_bottom == NULL) + return obt_boot_epoch_update(lut->lut_obd); + + rc = lu_env_init(&env, LCT_DT_THREAD); + if (rc) { + CERROR("Can't initialize environment rc=%i\n", rc); + return; + } + + spin_lock(&lut->lut_translock); + start_epoch = lr_epoch(lut->lut_last_transno) + 1; + lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS; + lut->lut_lsd.lsd_start_epoch = start_epoch; + spin_unlock(&lut->lut_translock); + + CFS_INIT_LIST_HEAD(&client_list); + /** + * The recovery is not yet finished and final queue can still be updated + * with resend requests. Move final list to separate one for processing + */ + spin_lock_bh(&lut->lut_obd->obd_processing_task_lock); + list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list); + spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock); + + /** + * go through list of exports participated in recovery and + * set new epoch for them + */ + list_for_each_entry(req, &client_list, rq_list) { + LASSERT(!req->rq_export->exp_delayed); + lut_client_epoch_update(&env, lut, req->rq_export); + } + /** return list back at once */ + spin_lock_bh(&lut->lut_obd->obd_processing_task_lock); + list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue); + spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock); + /** update server epoch */ + lut_server_data_update(&env, lut, 1); + lu_env_fini(&env); +} +EXPORT_SYMBOL(lut_boot_epoch_update); + +/** + * commit callback, need to update last_commited value + */ +void lut_cb_last_committed(struct lu_target *lut, __u64 transno, + void *data, int err) +{ + struct obd_export *exp = data; + + spin_lock(&lut->lut_translock); + if (transno > lut->lut_obd->obd_last_committed) + lut->lut_obd->obd_last_committed = transno; + + LASSERT(exp); + if (!lut->lut_obd->obd_stopping && + transno > exp->exp_last_committed) { + exp->exp_last_committed = transno; + spin_unlock(&lut->lut_translock); + ptlrpc_commit_replies(exp); + } else { + spin_unlock(&lut->lut_translock); + } + if (transno) + CDEBUG(D_HA, "%s: transno "LPD64" is committed\n", + lut->lut_obd->obd_name, transno); +} +EXPORT_SYMBOL(lut_cb_last_committed); + +void lut_cb_client(struct lu_target *lut, __u64 transno, + void *data, int err) +{ + LASSERT(lut->lut_obd); + target_client_add_cb(lut->lut_obd, transno, data, err); +} +EXPORT_SYMBOL(lut_cb_client); + +int lut_init(const struct lu_env *env, struct lu_target *lut, + struct obd_device *obd, struct dt_device *dt) +{ + struct lu_fid fid; + struct dt_object *o; + int rc = 0; + ENTRY; + + lut->lut_obd = obd; + lut->lut_bottom = dt; + lut->lut_last_rcvd = NULL; + + spin_lock_init(&lut->lut_translock); + spin_lock_init(&lut->lut_client_bitmap_lock); + spin_lock_init(&lut->lut_trans_table_lock); + + /** obdfilter has no lu_device stack yet */ + if (dt == NULL) + RETURN(rc); + o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid); + if (!IS_ERR(o)) { + lut->lut_last_rcvd = o; + } else { + rc = PTR_ERR(o); + CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc); + } + + RETURN(rc); +} +EXPORT_SYMBOL(lut_init); + +void lut_fini(const struct lu_env *env, struct lu_target *lut) +{ + ENTRY; + if (lut->lut_last_rcvd) + lu_object_put(env, &lut->lut_last_rcvd->do_lu); + lut->lut_last_rcvd = NULL; + EXIT; +} +EXPORT_SYMBOL(lut_fini); diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index c04abbe..ef3c783 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -399,6 +399,11 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ptlrpc_body, pb_limit)); LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n", (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit)); + CLASSERT(PTLRPC_NUM_VERSIONS == 4); + LASSERTF((int)offsetof(struct ptlrpc_body, pb_pre_versions[4]) == 120, " found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body, pb_pre_versions[4])); + LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]) == 8, " found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4])); /* Checks for struct obd_connect_data */ LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n", @@ -481,6 +486,7 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_AT == 0x01000000ULL); CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL); CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL); + CLASSERT(OBD_CONNECT_VBR == 0x80000000ULL); CLASSERT(OBD_CONNECT_SKIP_ORPHAN == 0x400000000ULL); /* Checks for struct obdo */ diff --git a/lustre/tests/Makefile.am b/lustre/tests/Makefile.am index 884d9fe..a3711d8 100644 --- a/lustre/tests/Makefile.am +++ b/lustre/tests/Makefile.am @@ -11,7 +11,7 @@ noinst_SCRIPTS += sanity.sh rundbench acceptance-small.sh compile.sh noinst_SCRIPTS += conf-sanity.sh insanity.sh lfscktest.sh oos.sh oos2.sh noinst_SCRIPTS += llog-test.sh recovery-small.sh replay-dual.sh sanity-quota.sh noinst_SCRIPTS += replay-ost-single.sh replay-single.sh run-llog.sh sanityN.sh -noinst_SCRIPTS += runracer +noinst_SCRIPTS += runracer replay-vbr.sh noinst_SCRIPTS += performance-sanity.sh mdsrate-create-small.sh noinst_SCRIPTS += mdsrate-create-large.sh mdsrate-lookup-1dir.sh noinst_SCRIPTS += mdsrate-stat-small.sh mdsrate-stat-large.sh diff --git a/lustre/tests/replay-single.sh b/lustre/tests/replay-single.sh index fc79347..3fd3517 100755 --- a/lustre/tests/replay-single.sh +++ b/lustre/tests/replay-single.sh @@ -1459,19 +1459,19 @@ run_test 60 "test llog post recovery init vs llog unlink" #test race llog recovery thread vs llog cleanup test_61a() { # was test_61 remote_ost_nodsh && skip "remote OST with nodsh" && return 0 - + mkdir -p $DIR/$tdir createmany -o $DIR/$tdir/$tfile-%d 800 - replay_barrier ost1 -# OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221 - unlinkmany $DIR/$tdir/$tfile-%d 800 + replay_barrier ost1 +# OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221 + unlinkmany $DIR/$tdir/$tfile-%d 800 set_nodes_failloc "$(osts_nodes)" 0x80000221 facet_failover ost1 - sleep 10 + sleep 10 fail ost1 sleep 30 set_nodes_failloc "$(osts_nodes)" 0x0 - + $CHECKSTAT -t file $DIR/$tdir/$tfile-* && return 1 rmdir $DIR/$tdir } @@ -1481,7 +1481,7 @@ run_test 61a "test race llog recovery vs llog cleanup" test_61b() { # OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x13a do_facet $SINGLEMDS "lctl set_param fail_loc=0x8000013a" - facet_failover $SINGLEMDS + facet_failover $SINGLEMDS sleep 10 fail $SINGLEMDS do_facet client dd if=/dev/zero of=$DIR/$tfile bs=4k count=1 || return 1 @@ -1492,10 +1492,10 @@ run_test 61b "test race mds llog sync vs llog cleanup" test_61c() { remote_ost_nodsh && skip "remote OST with nodsh" && return 0 -# OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222 - touch $DIR/$tfile +# OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222 + touch $DIR/$tfile set_nodes_failloc "$(osts_nodes)" 0x80000222 - rm $DIR/$tfile + rm $DIR/$tfile sleep 10 fail ost1 set_nodes_failloc "$(osts_nodes)" 0x0 @@ -1850,6 +1850,54 @@ test_70b () { run_test 70b "mds recovery; $CLIENTCOUNT clients" # end multi-client tests +test_73a() { + multiop_bg_pause $DIR/$tfile O_tSc || return 3 + pid=$! + rm -f $DIR/$tfile + + replay_barrier $SINGLEMDS +#define OBD_FAIL_LDLM_ENQUEUE 0x302 + do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000302" + fail $SINGLEMDS + kill -USR1 $pid + wait $pid || return 1 + [ -e $DIR/$tfile ] && return 2 + return 0 +} +run_test 73a "open(O_CREAT), unlink, replay, reconnect before open replay , close" + +test_73b() { + multiop_bg_pause $DIR/$tfile O_tSc || return 3 + pid=$! + rm -f $DIR/$tfile + + replay_barrier $SINGLEMDS +#define OBD_FAIL_LDLM_REPLY 0x30c + do_facet $SINGLEMDS "lctl set_param fail_loc=0x8000030c" + fail $SINGLEMDS + kill -USR1 $pid + wait $pid || return 1 + [ -e $DIR/$tfile ] && return 2 + return 0 +} +run_test 73b "open(O_CREAT), unlink, replay, reconnect at open_replay reply, close" + +test_73c() { + multiop_bg_pause $DIR/$tfile O_tSc || return 3 + pid=$! + rm -f $DIR/$tfile + + replay_barrier $SINGLEMDS +#define OBD_FAIL_TGT_LAST_REPLAY 0x710 + do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000710" + fail $SINGLEMDS + kill -USR1 $pid + wait $pid || return 1 + [ -e $DIR/$tfile ] && return 2 + return 0 +} +run_test 73c "open(O_CREAT), unlink, replay, reconnect at last_replay, close" + test_80a() { [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0 diff --git a/lustre/tests/replay-vbr.sh b/lustre/tests/replay-vbr.sh new file mode 100644 index 0000000..20b5e08 --- /dev/null +++ b/lustre/tests/replay-vbr.sh @@ -0,0 +1,701 @@ +#!/bin/bash + +set -e + +# bug number: 16356 +ALWAYS_EXCEPT="2 $REPLAY_VBR_EXCEPT" + +SAVE_PWD=$PWD +PTLDEBUG=${PTLDEBUG:--1} +LUSTRE=${LUSTRE:-`dirname $0`/..} +SETUP=${SETUP:-""} +CLEANUP=${CLEANUP:-""} +. $LUSTRE/tests/test-framework.sh + +init_test_env $@ + +. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh} + +[ -n "$CLIENTS" ] || { skip "Need two or more clients" && exit 0; } +[ $CLIENTCOUNT -ge 2 ] || \ + { skip "Need two or more clients, have $CLIENTCOUNT" && exit 0; } +remote_mds_nodsh && skip "remote MDS with nodsh" && exit 0 + +[ "$SLOW" = "no" ] && EXCEPT_SLOW="" + + +[ ! "$NAME" = "ncli" ] && ALWAYS_EXCEPT="$ALWAYS_EXCEPT" +[ "$NAME" = "ncli" ] && MOUNT_2="" +MOUNT_2="" +build_test_filter + +check_and_setup_lustre +rm -rf $DIR/[df][0-9]* + +[ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE + +[ "$CLIENTS" ] && zconf_umount_clients $CLIENTS $DIR + +test_1() { + echo "mount client $CLIENT1,$CLIENT2..." + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + do_node $CLIENT2 mkdir -p $DIR/$tdir + replay_barrier $SINGLEMDS + do_node $CLIENT1 createmany -o $DIR/$tfile- 25 + do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 1 + do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25 + zconf_umount $CLIENT2 $DIR + + facet_failover $SINGLEMDS + # recovery shouldn't fail due to missing client 2 + do_node $CLIENT1 df $DIR || return 1 + + # All 50 files should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2 + do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3 + + zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail" + [ -e $DIR/$tdir/$tfile-2-0 ] && error "$tfile-2-0 exists" + + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 1 "lost client doesn't affect another during replay" + +test_2() { + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + do_node $CLIENT2 mkdir -p $DIR/$tdir + replay_barrier $SINGLEMDS + do_node $CLIENT2 mcreate $DIR/$tdir/$tfile + do_node $CLIENT1 createmany -o $DIR/$tfile- 25 + #client1 read data from client2 which will be lost + do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile + do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25 + zconf_umount $CLIENT2 $DIR + + facet_failover $SINGLEMDS + # recovery shouldn't fail due to missing client 2 + do_node $CLIENT1 df $DIR || return 1 + + # All 50 files should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2 + do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3 + do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile && return 4 + + zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail" + + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 2 "lost data due to missed REMOTE client during replay" + +test_3a() { + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + #make sure the time will change + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.atime_diff=0" || return + do_node $CLIENT1 touch $DIR/$tfile + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile + sleep 1 + replay_barrier $SINGLEMDS + #change time + do_node $CLIENT2 touch $DIR/$tfile + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile + #another change + do_node $CLIENT1 touch $DIR/$tfile + #remove file + do_node $CLIENT1 rm $DIR/$tfile + zconf_umount $CLIENT2 $DIR + + facet_failover $SINGLEMDS + # recovery shouldn't fail due to missing client 2 + do_node $CLIENT1 df $DIR || return 1 + do_node $CLIENT1 $CHECKSTAT $DIR/$tfile && return 2 + + zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail" + + zconf_umount_clients $CLIENTS $DIR + + return 0 +} +run_test 3a "setattr of time/size doesn't change version" + +test_3b() { + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + #make sure the time will change + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.atime_diff=0" || return + + do_node $CLIENT1 touch $DIR/$tfile + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile + sleep 1 + replay_barrier $SINGLEMDS + #change mode + do_node $CLIENT2 chmod +x $DIR/$tfile + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile + #abother chmod + do_node $CLIENT1 chmod -x $DIR/$tfile + zconf_umount $CLIENT2 $DIR + + facet_failover $SINGLEMDS + # recovery should fail due to missing client 2 + do_node $CLIENT1 df $DIR && return 1 + + do_node $CLIENT1 $CHECKSTAT -p 755 $DIR/$tfile && return 2 + zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail" + + zconf_umount_clients $CLIENTS $DIR + + return 0 +} +run_test 3b "setattr of permissions changes version" + +vbr_deactivate_client() { + local client=$1 + echo "Deactivating client $client"; + do_node $client "sysctl -w lustre.fail_loc=0x50d" +} + +vbr_activate_client() { + local client=$1 + echo "Activating client $client"; + do_node $client "sysctl -w lustre.fail_loc=0x0" +} + +remote_server () +{ + local client=$1 + [ -z "$(do_node $client lctl dl | grep mdt)" ] && \ + [ -z "$(do_node $client lctl dl | grep ost)" ] +} + +test_4a() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + do_node $CLIENT2 mkdir -p $DIR/$tdir + replay_barrier $SINGLEMDS + do_node $CLIENT1 createmany -o $DIR/$tfile- 25 + do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25 + do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25 + vbr_deactivate_client $CLIENT2 + + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 1 + + # All 50 files should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2 + do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3 + + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 4 + # All 25 files from client2 should have been replayed + do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5 + + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 4a "fail MDS, delayed recovery" + +test_4b(){ + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + replay_barrier $SINGLEMDS + do_node $CLIENT1 createmany -o $DIR/$tfile- 25 + do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25 + vbr_deactivate_client $CLIENT2 + + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 1 + + # create another set of files + do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25 + + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 2 + + # All files from should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3 + do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 4 + do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5 + + zconf_umount_clients $CLIENTS $DIR +} +run_test 4b "fail MDS, normal operation, delayed open recovery" + +test_4c() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + replay_barrier $SINGLEMDS + do_node $CLIENT1 createmany -m $DIR/$tfile- 25 + do_node $CLIENT2 createmany -m $DIR/$tdir/$tfile-2- 25 + vbr_deactivate_client $CLIENT2 + + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 1 + + # create another set of files + do_node $CLIENT1 createmany -m $DIR/$tfile-3- 25 + + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 2 + + # All files from should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3 + do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 4 + do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5 + + zconf_umount_clients $CLIENTS $DIR +} +run_test 4c "fail MDS, normal operation, delayed recovery" + +test_5a() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + replay_barrier $SINGLEMDS + do_node $CLIENT1 createmany -o $DIR/$tfile- 25 + do_node $CLIENT2 createmany -o $DIR/$tfile-2- 1 + do_node $CLIENT1 createmany -o $DIR/$tfile-3- 1 + vbr_deactivate_client $CLIENT2 + + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR && return 1 + + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 2 + + # First 25 files should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3 + # Third file is failed due to missed client2 + do_node $CLIENT1 $CHECKSTAT $DIR/$tfile-3-0 && error "$tfile-3-0 exists" + # file from client2 should exists + do_node $CLIENT2 unlinkmany $DIR/$tfile-2- 1 || return 4 + + zconf_umount_clients $CLIENTS $DIR +} +run_test 5a "fail MDS, delayed recovery should fail" + +test_5b() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + replay_barrier $SINGLEMDS + do_node $CLIENT1 createmany -o $DIR/$tfile- 25 + do_node $CLIENT2 createmany -o $DIR/$tfile-2- 1 + vbr_deactivate_client $CLIENT2 + + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 1 + do_node $CLIENT1 $CHECKSTAT $DIR/$tfile-2-0 && error "$tfile-2-0 exists" + + # create another set of files + do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25 + + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR && return 4 + # file from client2 should fail + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile-2-0 && error "$tfile-2-0 exists" + + # All 50 files from client 1 should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2 + do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3 + + zconf_umount_clients $CLIENTS $DIR +} +run_test 5b "fail MDS, normal operation, delayed recovery should fail" + +test_6a() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + do_node $CLIENT2 mkdir -p $DIR/$tdir + replay_barrier $SINGLEMDS + do_node $CLIENT1 createmany -o $DIR/$tfile- 25 + do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25 + do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25 + vbr_deactivate_client $CLIENT2 + + facet_failover $SINGLEMDS + # replay only 5 requests + do_node $CLIENT2 "sysctl -w lustre.fail_val=5" +#define OBD_FAIL_PTLRPC_REPLAY 0x50e + do_node $CLIENT2 "sysctl -w lustre.fail_loc=0x2000050e" + do_node $CLIENT2 df $DIR + # vbr_activate_client $CLIENT2 + # need way to know that client stops replays + sleep 5 + + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 1 + + # All files should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2 + do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3 + do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5 + + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 6a "fail MDS, delayed recovery, fail MDS" + +test_7a() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + do_node $CLIENT2 mkdir -p $DIR/$tdir + replay_barrier $SINGLEMDS + do_node $CLIENT1 createmany -o $DIR/$tfile- 25 + do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25 + do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25 + vbr_deactivate_client $CLIENT2 + + facet_failover $SINGLEMDS + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 4 + + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 1 + + # All files should have been replayed + do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2 + do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3 + do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5 + + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 7a "fail MDS, delayed recovery, fail MDS" + +rmultiop_start() { + local client=$1 + local file=$2 + + # We need to run do_node in bg, because pdsh does not exit + # if child process of run script exists. + # I.e. pdsh does not exit when runmultiop_bg_pause exited, + # because of multiop_bg_pause -> $MULTIOP_PROG & + # By the same reason we need sleep a bit after do_nodes starts + # to let runmultiop_bg_pause start muliop and + # update /tmp/multiop_bg.pid ; + # The rm /tmp/multiop_bg.pid guarantees here that + # we have the updated by runmultiop_bg_pause + # /tmp/multiop_bg.pid file + + local pid_file=$TMP/multiop_bg.pid.$$ + do_node $client "rm -f $pid_file && MULTIOP_PID_FILE=$pid_file LUSTRE= runmultiop_bg_pause $file O_tSc" & + local pid=$! + sleep 3 + local multiop_pid + multiop_pid=$(do_node $client cat $pid_file) + [ -n "$multiop_pid" ] || error "$client : Can not get multiop_pid from $pid_file " + eval export ${client}_multiop_pid=$multiop_pid + eval export ${client}_do_node_pid=$pid + local var=${client}_multiop_pid + echo client $client multiop_bg started multiop_pid=${!var} + return $? +} + +rmultiop_stop() { + local client=$1 + local multiop_pid=${client}_multiop_pid + local do_node_pid=${client}_do_node_pid + + echo "Stopping multiop_pid=${!multiop_pid} (kill ${!multiop_pid} on $client)" + do_node $client kill -USR1 ${!multiop_pid} + + wait ${!do_node_pid} || true +} + +test_8a() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + rmultiop_start $CLIENT2 $DIR/$tfile || return 1 + do_node $CLIENT2 rm -f $DIR/$tfile + replay_barrier $SINGLEMDS + rmultiop_stop $CLIENT2 || return 2 + + vbr_deactivate_client $CLIENT2 + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 3 + #client1 is back and will try to open orphan + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 4 + + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists" + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 8a "orphans are kept until delayed recovery" + +test_8b() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + rmultiop_start $CLIENT2 $DIR/$tfile || return 1 + replay_barrier $SINGLEMDS + do_node $CLIENT1 rm -f $DIR/$tfile + + vbr_deactivate_client $CLIENT2 + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 2 + #client1 is back and will try to open orphan + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 3 + + rmultiop_stop $CLIENT2 || return 1 + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists" + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 8b "open1 | unlink2 X delayed_replay1, close1" + +test_8c() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + rmultiop_start $CLIENT2 $DIR/$tfile || return 1 + replay_barrier $SINGLEMDS + do_node $CLIENT1 rm -f $DIR/$tfile + rmultiop_stop $CLIENT2 || return 2 + + vbr_deactivate_client $CLIENT2 + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 3 + #client1 is back and will try to open orphan + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 4 + + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists" + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 8c "open1 | unlink2, close1 X delayed_replay1" + +test_8d() { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1 + [ $? -ne 0 ] && { skip "No delayed recovery support" && return; } + + remote_server $CLIENT2 || \ + { skip "Client $CLIENT2 is on the server node" && return 0; } + + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + rmultiop_start $CLIENT1 $DIR/$tfile || return 1 + rmultiop_start $CLIENT2 $DIR/$tfile || return 2 + replay_barrier $SINGLEMDS + do_node $CLIENT1 rm -f $DIR/$tfile + rmultiop_stop $CLIENT2 || return 3 + rmultiop_stop $CLIENT1 || return 4 + + vbr_deactivate_client $CLIENT2 + facet_failover $SINGLEMDS + do_node $CLIENT1 df $DIR || return 6 + + #client1 is back and will try to open orphan + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 8 + + do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists" + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 8d "open1, open2 | unlink2, close1, close2 X delayed_replay1" + +test_8e() { + zconf_mount $CLIENT1 $DIR + zconf_mount $CLIENT2 $DIR + + do_node $CLIENT1 mcreate $DIR/$tfile + do_node $CLIENT1 mkdir $DIR/$tfile-2 + replay_barrier $SINGLEMDS + # missed replay from client1 will lead to recovery by versions + do_node $CLIENT1 touch $DIR/$tfile-2/$tfile + do_node $CLIENT2 rm $DIR/$tfile || return 1 + do_node $CLIENT2 touch $DIR/$tfile || return 2 + + zconf_umount $CLIENT1 $DIR + facet_failover $SINGLEMDS + do_node $CLIENT2 df $DIR || return 6 + + do_node $CLIENT2 rm $DIR/$tfile || error "$tfile doesn't exists" + zconf_umount_clients $CLIENTS $DIR + return 0 +} +run_test 8e "create | unlink, create shouldn't fail" + +test_8f() { + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + do_node $CLIENT1 touch $DIR/$tfile + do_node $CLIENT1 mkdir $DIR/$tfile-2 + replay_barrier $SINGLEMDS + # missed replay from client1 will lead to recovery by versions + do_node $CLIENT1 touch $DIR/$tfile-2/$tfile + do_node $CLIENT2 rm -f $DIR/$tfile || return 1 + do_node $CLIENT2 mcreate $DIR/$tfile || return 2 + + zconf_umount $CLIENT1 $DIR + facet_failover $SINGLEMDS + do_node $CLIENT2 df $DIR || return 6 + + do_node $CLIENT2 rm $DIR/$tfile || error "$tfile doesn't exists" + zconf_umount $CLIENT2 $DIR + return 0 +} +run_test 8f "create | unlink, create shouldn't fail" + +test_8g() { + zconf_mount_clients $CLIENT1 $DIR + zconf_mount_clients $CLIENT2 $DIR + + do_node $CLIENT1 touch $DIR/$tfile + do_node $CLIENT1 mkdir $DIR/$tfile-2 + replay_barrier $SINGLEMDS + # missed replay from client1 will lead to recovery by versions + do_node $CLIENT1 touch $DIR/$tfile-2/$tfile + do_node $CLIENT2 rm -f $DIR/$tfile || return 1 + do_node $CLIENT2 mkdir $DIR/$tfile || return 2 + + zconf_umount $CLIENT1 $DIR + facet_failover $SINGLEMDS + do_node $CLIENT2 df $DIR || return 6 + + do_node $CLIENT2 rmdir $DIR/$tfile || error "$tfile doesn't exists" + zconf_umount $CLIENT2 $DIR + return 0 +} +run_test 8g "create | unlink, create shouldn't fail" + +test_10 () { + local var=${SINGLEMDS}_svc + do_facet $SINGLEMDS $LCTL get_param -n mdd.${!var}.stale_export_age && \ + { skip "No delayed recovery support" && return; } + [ -z "$DBENCH_LIB" ] && skip "DBENCH_LIB is not set" && return 0 + + zconf_mount_clients $CLIENTS $DIR + + local duration="-t 60" + local cmd="rundbench 1 $duration " + local PID="" + for CLIENT in ${CLIENTS//,/ }; do + $PDSH $CLIENT "set -x; PATH=:$PATH:$LUSTRE/utils:$LUSTRE/tests/:${DBENCH_LIB} DBENCH_LIB=${DBENCH_LIB} $cmd" & + PID=$! + echo $PID >pid.$CLIENT + echo "Started load PID=`cat pid.$CLIENT`" + done + + replay_barrier $SINGLEMDS + sleep 3 # give clients a time to do operations + + vbr_deactivate_client $CLIENT2 + + log "$TESTNAME fail $SINGLEMDS 1" + fail $SINGLEMDS + +# wait for client to reconnect to MDS + sleep $TIMEOUT + + vbr_activate_client $CLIENT2 + do_node $CLIENT2 df $DIR || return 4 + + for CLIENT in ${CLIENTS//,/ }; do + PID=`cat pid.$CLIENT` + wait $PID + rc=$? + echo "load on ${CLIENT} returned $rc" + done + + zconf_umount_clients $CLIENTS $DIR +} +run_test 10 "mds version recovery; $CLIENTCOUNT clients" + +equals_msg `basename $0`: test complete, cleaning up +#SLEEP=$((`date +%s` - $NOW)) +#[ $SLEEP -lt $TIMEOUT ] && sleep $SLEEP +check_and_cleanup_lustre +[ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG && grep -q FAIL $TESTSUITELOG && exit 1 || true diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index c357ec8..e637384 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -4960,6 +4960,7 @@ test_120a() { [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." lru_resize_enable mdc lru_resize_enable osc +# rm -rf $DIR/$tdir } run_test 120a "Early Lock Cancel: mkdir test" @@ -4980,6 +4981,7 @@ test_120b() { [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." lru_resize_enable mdc lru_resize_enable osc +# rm -rf $DIR/$tdir } run_test 120b "Early Lock Cancel: create test" @@ -5002,6 +5004,7 @@ test_120c() { [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." lru_resize_enable mdc lru_resize_enable osc +# rm -rf $DIR/$tdir } run_test 120c "Early Lock Cancel: link test" @@ -5023,6 +5026,7 @@ test_120d() { [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." lru_resize_enable mdc lru_resize_enable osc +# rm -rf $DIR/$tdir } run_test 120d "Early Lock Cancel: setattr test" @@ -5050,6 +5054,7 @@ test_120e() { [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." lru_resize_enable mdc lru_resize_enable osc +# rm -rf $DIR/$tdir } run_test 120e "Early Lock Cancel: unlink test" @@ -5080,6 +5085,7 @@ test_120f() { [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured." lru_resize_enable mdc lru_resize_enable osc +# rm -rf $DIR/$tdir } run_test 120f "Early Lock Cancel: rename test" @@ -5121,6 +5127,7 @@ test_120g() { # wait for commitment of removal lru_resize_enable mdc lru_resize_enable osc +# rm -rf $DIR/$tdir } run_test 120g "Early Lock Cancel: performance test" diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index a3f2382..3c8dfaa 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -171,6 +171,8 @@ check_ptlrpc_body(void) CHECK_MEMBER(ptlrpc_body, pb_service_time); CHECK_MEMBER(ptlrpc_body, pb_slv); CHECK_MEMBER(ptlrpc_body, pb_limit); + CHECK_CVALUE(PTLRPC_NUM_VERSIONS); + CHECK_MEMBER(ptlrpc_body, pb_pre_versions[PTLRPC_NUM_VERSIONS]); } static void check_obd_connect_data(void) @@ -217,6 +219,7 @@ static void check_obd_connect_data(void) CHECK_CDEFINE(OBD_CONNECT_AT); CHECK_CDEFINE(OBD_CONNECT_CANCELSET); CHECK_CDEFINE(OBD_CONNECT_LRU_RESIZE); + CHECK_CDEFINE(OBD_CONNECT_VBR); CHECK_CDEFINE(OBD_CONNECT_SKIP_ORPHAN); } diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 5224858..3862e39 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -396,6 +396,11 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ptlrpc_body, pb_limit)); LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n", (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit)); + CLASSERT(PTLRPC_NUM_VERSIONS == 4); + LASSERTF((int)offsetof(struct ptlrpc_body, pb_pre_versions[4]) == 120, " found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body, pb_pre_versions[4])); + LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]) == 8, " found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4])); /* Checks for struct obd_connect_data */ LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n", @@ -478,6 +483,7 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_AT == 0x01000000ULL); CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL); CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL); + CLASSERT(OBD_CONNECT_VBR == 0x80000000ULL); CLASSERT(OBD_CONNECT_SKIP_ORPHAN == 0x400000000ULL); /* Checks for struct obdo */