# include <lustre_quota.h>
#endif
-static struct obd_ops cmm_obd_device_ops = {
+struct obd_ops cmm_obd_device_ops = {
.o_owner = THIS_MODULE
};
RETURN(rc);
}
+static dt_obj_version_t cml_version_get(const struct lu_env *env,
+ struct md_object *mo)
+{
+ return mo_version_get(env, md_object_next(mo));
+}
+
+static void cml_version_set(const struct lu_env *env, struct md_object *mo,
+ dt_obj_version_t version)
+{
+ return mo_version_set(env, md_object_next(mo), version);
+}
+
static const struct md_object_operations cml_mo_ops = {
.moo_permission = cml_permission,
.moo_attr_get = cml_attr_get,
.moo_readlink = cml_readlink,
.moo_capa_get = cml_capa_get,
.moo_object_sync = cml_object_sync,
+ .moo_version_get = cml_version_get,
+ .moo_version_set = cml_version_set,
.moo_path = cml_path,
};
return -EFAULT;
}
+static dt_obj_version_t cmr_version_get(const struct lu_env *env,
+ struct md_object *mo)
+{
+ LBUG();
+}
+
+static void cmr_version_set(const struct lu_env *env, struct md_object *mo,
+ dt_obj_version_t version)
+{
+ LBUG();
+}
+
static const struct md_object_operations cmr_mo_ops = {
.moo_permission = cmr_permission,
.moo_attr_get = cmr_attr_get,
.moo_readlink = cmr_readlink,
.moo_capa_get = cmr_capa_get,
.moo_object_sync = cmr_object_sync,
+ .moo_version_get = cmr_version_get,
+ .moo_version_set = cmr_version_set,
.moo_path = cmr_path,
};
md_object.h dt_object.h lustre_param.h lustre_mdt.h \
lustre_fid.h lustre_fld.h lustre_req_layout.h lustre_capa.h \
lustre_idmap.h lustre_eacl.h interval_tree.h obd_cksum.h \
- lu_ref.h cl_object.h lustre_acl.h lclient.h
+ lu_ref.h cl_object.h lustre_acl.h lclient.h lu_target.h
enum dt_format_type dt_mode_to_dft(__u32 mode);
+/** Version type. May differ in DMU and ldiskfs */
+typedef __u64 dt_obj_version_t;
+
/**
* Per-dt-object operations.
*/
struct lustre_capa *old,
__u64 opc);
int (*do_object_sync)(const struct lu_env *, struct dt_object *);
+ dt_obj_version_t (*do_version_get)(const struct lu_env *env,
+ struct dt_object *dt);
+ void (*do_version_set)(const struct lu_env *env, struct dt_object *dt,
+ dt_obj_version_t new_version);
/**
* Get object info of next level. Currently, only get inode from osd.
* This is only used by quota b=16542
int (*dtc_txn_commit)(const struct lu_env *env,
struct thandle *txn, void *cookie);
void *dtc_cookie;
+ __u32 dtc_tag;
struct list_head dtc_linkage;
};
struct dt_device *dev,
const struct lu_fid *fid);
+static inline dt_obj_version_t do_version_get(const struct lu_env *env,
+ struct dt_object *o)
+{
+ LASSERT(o->do_ops->do_version_get);
+ return o->do_ops->do_version_get(env, o);
+}
+
+static inline void do_version_set(const struct lu_env *env,
+ struct dt_object *o, dt_obj_version_t v)
+{
+ LASSERT(o->do_ops->do_version_set);
+ return o->do_ops->do_version_set(env, o, v);
+}
+
+int dt_record_read(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, loff_t *pos);
+int dt_record_write(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, loff_t *pos, struct thandle *th);
+
+
+static inline struct thandle *dt_trans_start(const struct lu_env *env,
+ struct dt_device *d,
+ struct txn_param *p)
+{
+ LASSERT(d->dd_ops->dt_trans_start);
+ return d->dd_ops->dt_trans_start(env, d, p);
+}
+
+static inline void dt_trans_stop(const struct lu_env *env,
+ struct dt_device *d,
+ struct thandle *th)
+{
+ LASSERT(d->dd_ops->dt_trans_stop);
+ return d->dd_ops->dt_trans_stop(env, th);
+}
/** @} dt */
#endif /* __LUSTRE_DT_OBJECT_H */
static inline __u64 fsfilt_get_version(struct obd_device *obd,
struct inode *inode)
{
- if (obd->obd_fsops->fs_set_version)
+ if (obd->obd_fsops->fs_get_version)
return obd->obd_fsops->fs_get_version(inode);
return -EOPNOTSUPP;
}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ */
+
+#ifndef _LUSTRE_LU_TARGET_H
+#define _LUSTRE_LU_TARGET_H
+
+#include <dt_object.h>
+#include <lustre_disk.h>
+
+struct lu_target {
+ struct obd_device *lut_obd;
+ struct dt_device *lut_bottom;
+ /** last_rcvd file */
+ struct dt_object *lut_last_rcvd;
+ /* transaction callbacks */
+ struct dt_txn_callback lut_txn_cb;
+ /** server data in last_rcvd file */
+ struct lr_server_data lut_lsd;
+ /** Server last transaction number */
+ __u64 lut_last_transno;
+ /** Lock protecting last transaction number */
+ spinlock_t lut_translock;
+ /** Lock protecting client bitmap */
+ spinlock_t lut_client_bitmap_lock;
+ /** Bitmap of known clients */
+ unsigned long lut_client_bitmap[LR_CLIENT_BITMAP_SIZE];
+ /** Number of mounts */
+ __u64 lut_mount_count;
+ __u32 lut_stale_export_age;
+ spinlock_t lut_trans_table_lock;
+};
+
+typedef void (*lut_cb_t)(struct lu_target *lut, __u64 transno,
+ void *data, int err);
+struct lut_commit_cb {
+ lut_cb_t lut_cb_func;
+ void *lut_cb_data;
+};
+
+void lut_boot_epoch_update(struct lu_target *);
+void lut_cb_last_committed(struct lu_target *, __u64, void *, int);
+void lut_cb_client(struct lu_target *, __u64, void *, int);
+int lut_init(const struct lu_env *, struct lu_target *,
+ struct obd_device *, struct dt_device *);
+void lut_fini(const struct lu_env *, struct lu_target *);
+
+#endif /* __LUSTRE_LU_TARGET_H */
OBD_CONNECT_RMT_CLIENT_FORCE | \
OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \
OBD_CONNECT_MDS_MDS | OBD_CONNECT_FID | \
- LRU_RESIZE_CONNECT_FLAG | \
+ LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_VBR | \
OBD_CONNECT_LOV_V3)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
LRU_RESIZE_CONNECT_FLAG | OBD_CONNECT_CKSUM | \
OBD_CONNECT_CHANGE_QS | \
OBD_CONNECT_OSS_CAPA | OBD_CONNECT_RMT_CLIENT | \
- OBD_CONNECT_RMT_CLIENT_FORCE | \
+ OBD_CONNECT_RMT_CLIENT_FORCE | OBD_CONNECT_VBR | \
OBD_CONNECT_MDS | OBD_CONNECT_SKIP_ORPHAN | \
- OBD_CONNECT_GRANT_SHRINK)
+ OBD_CONNECT_GRANT_SHRINK)
#define ECHO_CONNECT_SUPPORTED (0)
#define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT)
/****************** last_rcvd file *********************/
+/** version recovery epoch */
+#define LR_EPOCH_BITS 32
+#define lr_epoch(a) ((a) >> LR_EPOCH_BITS)
+#define LR_EXPIRE_INTERVALS 16 /**< number of intervals to track transno */
+
#define LR_SERVER_SIZE 512
#define LR_CLIENT_START 8192
#define LR_CLIENT_SIZE 128
#define LR_MAX_CLIENTS (CFS_PAGE_SIZE * 8)
#endif
+#define LR_CLIENT_BITMAP_SIZE ((LR_MAX_CLIENTS >> 3) / sizeof(long))
+
/** COMPAT_146: this is an OST (temporary) */
#define OBD_COMPAT_OST 0x00000002
/** COMPAT_146: this is an MDT (temporary) */
__u8 lsd_peeruuid[40]; /* UUID of MDS associated with this OST */
__u32 lsd_ost_index; /* index number of OST in LOV */
__u32 lsd_mdt_index; /* index number of MDT in LMV */
- __u8 lsd_padding[LR_SERVER_SIZE - 148];
+ __u32 lsd_start_epoch; /* VBR: start epoch from last boot */
+ /** transaction values since lsd_trans_table_time */
+ __u64 lsd_trans_table[LR_EXPIRE_INTERVALS];
+ /** start point of transno table below */
+ __u32 lsd_trans_table_time; /* time of first slot in table above */
+ __u32 lsd_expire_intervals; /* LR_EXPIRE_INTERVALS */
+ __u8 lsd_padding[LR_SERVER_SIZE - 288];
};
/* Data stored per client in the last_rcvd file. In le32 order. */
__u64 lcd_last_close_xid; /* xid for the last transaction */
__u32 lcd_last_close_result; /* result from last RPC */
__u32 lcd_last_close_data; /* per-op data */
- __u8 lcd_padding[LR_CLIENT_SIZE - 88];
+ /* VBR: last versions */
+ __u64 lcd_pre_versions[4];
+ __u32 lcd_last_epoch;
+ /** orphans handling for delayed export rely on that */
+ __u32 lcd_first_epoch;
+ __u8 lcd_padding[LR_CLIENT_SIZE - 128];
};
+/* last_rcvd handling */
+static inline void lsd_le_to_cpu(struct lr_server_data *buf,
+ struct lr_server_data *lsd)
+{
+ int i;
+ memcpy(lsd->lsd_uuid, buf->lsd_uuid, sizeof (lsd->lsd_uuid));
+ lsd->lsd_last_transno = le64_to_cpu(buf->lsd_last_transno);
+ lsd->lsd_compat14 = le64_to_cpu(buf->lsd_compat14);
+ lsd->lsd_mount_count = le64_to_cpu(buf->lsd_mount_count);
+ lsd->lsd_feature_compat = le32_to_cpu(buf->lsd_feature_compat);
+ lsd->lsd_feature_rocompat = le32_to_cpu(buf->lsd_feature_rocompat);
+ lsd->lsd_feature_incompat = le32_to_cpu(buf->lsd_feature_incompat);
+ lsd->lsd_server_size = le32_to_cpu(buf->lsd_server_size);
+ lsd->lsd_client_start = le32_to_cpu(buf->lsd_client_start);
+ lsd->lsd_client_size = le16_to_cpu(buf->lsd_client_size);
+ lsd->lsd_subdir_count = le16_to_cpu(buf->lsd_subdir_count);
+ lsd->lsd_catalog_oid = le64_to_cpu(buf->lsd_catalog_oid);
+ lsd->lsd_catalog_ogen = le32_to_cpu(buf->lsd_catalog_ogen);
+ memcpy(lsd->lsd_peeruuid, buf->lsd_peeruuid, sizeof(lsd->lsd_peeruuid));
+ lsd->lsd_ost_index = le32_to_cpu(buf->lsd_ost_index);
+ lsd->lsd_mdt_index = le32_to_cpu(buf->lsd_mdt_index);
+ lsd->lsd_start_epoch = le32_to_cpu(buf->lsd_start_epoch);
+ for (i = 0; i < LR_EXPIRE_INTERVALS; i++)
+ lsd->lsd_trans_table[i] = le64_to_cpu(buf->lsd_trans_table[i]);
+ lsd->lsd_trans_table_time = le32_to_cpu(buf->lsd_trans_table_time);
+ lsd->lsd_expire_intervals = le32_to_cpu(buf->lsd_expire_intervals);
+}
+
+static inline void lsd_cpu_to_le(struct lr_server_data *lsd,
+ struct lr_server_data *buf)
+{
+ int i;
+ memcpy(buf->lsd_uuid, lsd->lsd_uuid, sizeof (buf->lsd_uuid));
+ buf->lsd_last_transno = cpu_to_le64(lsd->lsd_last_transno);
+ buf->lsd_compat14 = cpu_to_le64(lsd->lsd_compat14);
+ buf->lsd_mount_count = cpu_to_le64(lsd->lsd_mount_count);
+ buf->lsd_feature_compat = cpu_to_le32(lsd->lsd_feature_compat);
+ buf->lsd_feature_rocompat = cpu_to_le32(lsd->lsd_feature_rocompat);
+ buf->lsd_feature_incompat = cpu_to_le32(lsd->lsd_feature_incompat);
+ buf->lsd_server_size = cpu_to_le32(lsd->lsd_server_size);
+ buf->lsd_client_start = cpu_to_le32(lsd->lsd_client_start);
+ buf->lsd_client_size = cpu_to_le16(lsd->lsd_client_size);
+ buf->lsd_subdir_count = cpu_to_le16(lsd->lsd_subdir_count);
+ buf->lsd_catalog_oid = cpu_to_le64(lsd->lsd_catalog_oid);
+ buf->lsd_catalog_ogen = cpu_to_le32(lsd->lsd_catalog_ogen);
+ memcpy(buf->lsd_peeruuid, lsd->lsd_peeruuid, sizeof(buf->lsd_peeruuid));
+ buf->lsd_ost_index = cpu_to_le32(lsd->lsd_ost_index);
+ buf->lsd_mdt_index = cpu_to_le32(lsd->lsd_mdt_index);
+ buf->lsd_start_epoch = cpu_to_le32(lsd->lsd_start_epoch);
+ for (i = 0; i < LR_EXPIRE_INTERVALS; i++)
+ buf->lsd_trans_table[i] = cpu_to_le64(lsd->lsd_trans_table[i]);
+ buf->lsd_trans_table_time = cpu_to_le32(lsd->lsd_trans_table_time);
+ buf->lsd_expire_intervals = cpu_to_le32(lsd->lsd_expire_intervals);
+}
+
+static inline void lcd_le_to_cpu(struct lsd_client_data *buf,
+ struct lsd_client_data *lcd)
+{
+ memcpy(lcd->lcd_uuid, buf->lcd_uuid, sizeof (lcd->lcd_uuid));
+ lcd->lcd_last_transno = le64_to_cpu(buf->lcd_last_transno);
+ lcd->lcd_last_xid = le64_to_cpu(buf->lcd_last_xid);
+ lcd->lcd_last_result = le32_to_cpu(buf->lcd_last_result);
+ lcd->lcd_last_data = le32_to_cpu(buf->lcd_last_data);
+ lcd->lcd_last_close_transno = le64_to_cpu(buf->lcd_last_close_transno);
+ lcd->lcd_last_close_xid = le64_to_cpu(buf->lcd_last_close_xid);
+ lcd->lcd_last_close_result = le32_to_cpu(buf->lcd_last_close_result);
+ lcd->lcd_last_close_data = le32_to_cpu(buf->lcd_last_close_data);
+ lcd->lcd_pre_versions[0] = le64_to_cpu(buf->lcd_pre_versions[0]);
+ lcd->lcd_pre_versions[1] = le64_to_cpu(buf->lcd_pre_versions[1]);
+ lcd->lcd_pre_versions[2] = le64_to_cpu(buf->lcd_pre_versions[2]);
+ lcd->lcd_pre_versions[3] = le64_to_cpu(buf->lcd_pre_versions[3]);
+ lcd->lcd_last_epoch = le32_to_cpu(buf->lcd_last_epoch);
+ lcd->lcd_first_epoch = le32_to_cpu(buf->lcd_first_epoch);
+}
+
+static inline void lcd_cpu_to_le(struct lsd_client_data *lcd,
+ struct lsd_client_data *buf)
+{
+ memcpy(buf->lcd_uuid, lcd->lcd_uuid, sizeof (lcd->lcd_uuid));
+ buf->lcd_last_transno = cpu_to_le64(lcd->lcd_last_transno);
+ buf->lcd_last_xid = cpu_to_le64(lcd->lcd_last_xid);
+ buf->lcd_last_result = cpu_to_le32(lcd->lcd_last_result);
+ buf->lcd_last_data = cpu_to_le32(lcd->lcd_last_data);
+ buf->lcd_last_close_transno = cpu_to_le64(lcd->lcd_last_close_transno);
+ buf->lcd_last_close_xid = cpu_to_le64(lcd->lcd_last_close_xid);
+ buf->lcd_last_close_result = cpu_to_le32(lcd->lcd_last_close_result);
+ buf->lcd_last_close_data = cpu_to_le32(lcd->lcd_last_close_data);
+ buf->lcd_pre_versions[0] = cpu_to_le64(lcd->lcd_pre_versions[0]);
+ buf->lcd_pre_versions[1] = cpu_to_le64(lcd->lcd_pre_versions[1]);
+ buf->lcd_pre_versions[2] = cpu_to_le64(lcd->lcd_pre_versions[2]);
+ buf->lcd_pre_versions[3] = cpu_to_le64(lcd->lcd_pre_versions[3]);
+ buf->lcd_last_epoch = cpu_to_le32(lcd->lcd_last_epoch);
+ buf->lcd_first_epoch = cpu_to_le32(lcd->lcd_first_epoch);
+}
+
+static inline __u64 lcd_last_transno(struct lsd_client_data *lcd)
+{
+ return (lcd->lcd_last_transno > lcd->lcd_last_close_transno ?
+ lcd->lcd_last_transno : lcd->lcd_last_close_transno);
+}
+
+static inline __u64 lcd_last_xid(struct lsd_client_data *lcd)
+{
+ return (lcd->lcd_last_xid > lcd->lcd_last_close_xid ?
+ lcd->lcd_last_xid : lcd->lcd_last_close_xid);
+}
/****************** superblock additional info *********************/
#ifdef __KERNEL__
/****************** prototypes *********************/
#ifdef __KERNEL__
-#include <obd_class.h>
/* obd_mount.c */
void lustre_register_client_fill_super(int (*cfs)(struct super_block *sb));
int lustre_common_put_super(struct super_block *sb);
-int lustre_process_log(struct super_block *sb, char *logname,
- struct config_llog_instance *cfg);
-int lustre_end_log(struct super_block *sb, char *logname,
- struct config_llog_instance *cfg);
struct lustre_mount_info *server_get_mount(const char *name);
struct lustre_mount_info *server_get_mount_2(const char *name);
int server_put_mount(const char *name, struct vfsmount *mnt);
#include <lprocfs_status.h>
#include <class_hash.h>
-/* Data stored per client in the last_rcvd file. In le32 order. */
struct mds_client_data;
struct mdt_client_data;
struct mds_idmap_table;
struct mdt_idmap_table;
+struct lu_export_data {
+ /** Protects led_lcd below */
+ struct semaphore led_lcd_lock;
+ /** Per-client data for each export */
+ struct lsd_client_data *led_lcd;
+ /** Offset of record in last_rcvd file */
+ loff_t led_lr_off;
+ /** Client index in last_rcvd file */
+ int led_lr_idx;
+};
+
struct mdt_export_data {
+ struct lu_export_data med_led;
struct list_head med_open_head;
spinlock_t med_open_lock; /* lock med_open_head, mfd_list*/
- struct semaphore med_lcd_lock;
- struct lsd_client_data *med_lcd;
__u64 med_ibits_known;
- loff_t med_lr_off;
- int med_lr_idx;
struct semaphore med_idmap_sem;
struct lustre_idmap_table *med_idmap;
};
+#define med_lcd_lock med_led.led_lcd_lock
+#define med_lcd med_led.led_lcd
+#define med_lr_off med_led.led_lr_off
+#define med_lr_idx med_led.led_lr_idx
+
struct osc_creator {
spinlock_t oscc_lock;
struct list_head oscc_list;
/* In-memory access to client data from OST struct */
struct filter_export_data {
- spinlock_t fed_lock; /* protects fed_open_head */
- struct lsd_client_data *fed_lcd;
- loff_t fed_lr_off;
- int fed_lr_idx;
+ struct lu_export_data fed_led;
+ spinlock_t fed_lock; /**< protects fed_mod_list */
long fed_dirty; /* in bytes */
long fed_grant; /* in bytes */
struct list_head fed_mod_list; /* files being modified */
__u32 fed_group;
};
+#define fed_lcd_lock fed_led.led_lcd_lock
+#define fed_lcd fed_led.led_lcd
+#define fed_lr_off fed_led.led_lr_off
+#define fed_lr_idx fed_led.led_lr_idx
+
typedef struct nid_stat_uuid {
struct list_head ns_uuid_list;
struct obd_uuid ns_uuid;
lustre_hash_t *exp_lock_hash; /* existing lock hash */
spinlock_t exp_lock_hash_lock;
struct list_head exp_outstanding_replies;
- time_t exp_last_request_time;
+ struct list_head exp_uncommitted_replies;
+ spinlock_t exp_uncommitted_replies_lock;
+ __u64 exp_last_committed;
+ cfs_time_t exp_last_request_time;
struct list_head exp_req_replay_queue;
spinlock_t exp_lock; /* protects flags int below */
/* ^ protects exp_outstanding_replies too */
exp_in_recovery:1,
exp_disconnected:1,
exp_connecting:1,
+ /** VBR: export missed recovery */
+ exp_delayed:1,
+ /** VBR: failed version checking */
+ exp_vbr_failed:1,
exp_req_replay_needed:1,
exp_lock_replay_needed:1,
exp_need_sync:1,
cfs_time_t exp_flvr_expire[2]; /* seconds */
union {
+ struct lu_export_data eu_target_data;
struct mdt_export_data eu_mdt_data;
struct filter_export_data eu_filter_data;
struct ec_export_data eu_ec_data;
} u;
};
+#define exp_target_data u.eu_target_data
#define exp_mdt_data u.eu_mdt_data
#define exp_filter_data u.eu_filter_data
#define exp_ec_data u.eu_ec_data
+static inline int exp_expired(struct obd_export *exp, cfs_duration_t age)
+{
+ LASSERT(exp->exp_delayed);
+ return cfs_time_before(cfs_time_add(exp->exp_last_request_time, age),
+ cfs_time_current_sec());
+}
+
static inline int exp_connect_cancelset(struct obd_export *exp)
{
LASSERT(exp != NULL);
OBD_CONNECT_RMT_CLIENT);
}
+static inline int exp_connect_vbr(struct obd_export *exp)
+{
+ LASSERT(exp != NULL);
+ LASSERT(exp->exp_connection);
+ return !!(exp->exp_connect_flags & OBD_CONNECT_VBR);
+}
+
static inline int imp_connect_lru_resize(struct obd_import *imp)
{
struct obd_connect_data *ocd;
imp_server_timeout:1, /* use 1/2 timeout on MDS' OSCs */
imp_initial_recov:1, /* retry the initial connection */
imp_initial_recov_bk:1, /* turn off init_recov after trying all failover nids */
+ imp_delayed_recovery:1, /* VBR: imp in delayed recovery */
+ imp_no_lock_replay:1, /* VBR: if gap was found then no lock replays */
+ imp_vbr_failed:1, /* recovery by versions was failed */
imp_force_verify:1, /* force an immidiate ping */
imp_pingable:1, /* pingable */
imp_resend_replay:1, /* resend for replay */
/* target.c */
struct ptlrpc_request;
-struct recovd_data;
-struct recovd_obd;
struct obd_export;
+struct lu_target;
#include <lustre_ha.h>
#include <lustre_net.h>
#include <lvfs.h>
#define OBD_RECOVERY_MAX_TIME (obd_timeout * 18) /* b13079 */
void target_cancel_recovery_timer(struct obd_device *obd);
-int target_start_recovery_thread(struct obd_device *obd,
- svc_handler_t handler);
void target_stop_recovery_thread(struct obd_device *obd);
void target_cleanup_recovery(struct obd_device *obd);
int target_queue_recovery_request(struct ptlrpc_request *req,
#error Unsupported operating system.
#endif
-#include <obd.h>
+#include <obd_class.h>
#include <obd_ost.h>
#include <lustre/lustre_idl.h>
/**
* The refcount for lcm
*/
- atomic_t lcm_refcount;
+ atomic_t lcm_refcount;
/**
* Thread control structure. Used for control commit thread.
*/
char lcm_name[LCM_NAME_SIZE];
};
-static inline struct llog_commit_master
+static inline struct llog_commit_master
*lcm_get(struct llog_commit_master *lcm)
{
LASSERT(atomic_read(&lcm->lcm_refcount) > 0);
return lcm;
}
-static inline void
+static inline void
lcm_put(struct llog_commit_master *lcm)
{
if (!atomic_dec_and_test(&lcm->lcm_refcount)) {
return ;
}
- OBD_FREE_PTR(lcm);
+ OBD_FREE_PTR(lcm);
}
struct llog_canceld_ctxt {
RETURN(rc);
}
+int lustre_process_log(struct super_block *sb, char *logname,
+ struct config_llog_instance *cfg);
+int lustre_end_log(struct super_block *sb, char *logname,
+ struct config_llog_instance *cfg);
+
#endif
/* ptlrpc/service.c */
void ptlrpc_save_lock (struct ptlrpc_request *req,
struct lustre_handle *lock, int mode, int no_ack);
-void ptlrpc_commit_replies (struct obd_device *obd);
+void ptlrpc_commit_replies(struct obd_export *exp);
void ptlrpc_dispatch_difficult_reply (struct ptlrpc_reply_state *rs);
void ptlrpc_schedule_difficult_reply (struct ptlrpc_reply_state *rs);
struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
__u32 lustre_msg_get_opc(struct lustre_msg *msg);
__u64 lustre_msg_get_last_xid(struct lustre_msg *msg);
__u64 lustre_msg_get_last_committed(struct lustre_msg *msg);
+__u64 *lustre_msg_get_versions(struct lustre_msg *msg);
__u64 lustre_msg_get_transno(struct lustre_msg *msg);
__u64 lustre_msg_get_slv(struct lustre_msg *msg);
__u32 lustre_msg_get_limit(struct lustre_msg *msg);
void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc);
void lustre_msg_set_last_xid(struct lustre_msg *msg, __u64 last_xid);
void lustre_msg_set_last_committed(struct lustre_msg *msg,__u64 last_committed);
+void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions);
void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno);
void lustre_msg_set_status(struct lustre_msg *msg, __u32 status);
void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt);
int priority);
int client_import_del_conn(struct obd_import *imp, struct obd_uuid *uuid);
int import_set_conn_priority(struct obd_import *imp, struct obd_uuid *uuid);
+void client_destroy_import(struct obd_import *imp);
/* ptlrpc/pinger.c */
enum timeout_event {
/*
* super-class definitions.
*/
-#include <lu_object.h>
+#include <dt_object.h>
#include <lvfs.h>
struct md_device;
struct lustre_capa *, int renewal);
int (*moo_object_sync)(const struct lu_env *, struct md_object *);
-
+ dt_obj_version_t (*moo_version_get)(const struct lu_env *,
+ struct md_object *);
+ void (*moo_version_set)(const struct lu_env *, struct md_object *,
+ dt_obj_version_t);
int (*moo_path)(const struct lu_env *env, struct md_object *obj,
char *path, int pathlen, __u64 *recno, int *linkno);
};
return m->mo_ops->moo_object_sync(env, m);
}
+static inline dt_obj_version_t mo_version_get(const struct lu_env *env,
+ struct md_object *m)
+{
+ LASSERT(m->mo_ops->moo_version_get);
+ return m->mo_ops->moo_version_get(env, m);
+}
+
+static inline void mo_version_set(const struct lu_env *env,
+ struct md_object *m, dt_obj_version_t ver)
+{
+ LASSERT(m->mo_ops->moo_version_set);
+ return m->mo_ops->moo_version_set(env, m, ver);
+}
+
static inline int mdo_lookup(const struct lu_env *env,
struct md_object *p,
const struct lu_name *lname,
#define IOC_MDC_MAX_NR 50
#include <lustre/lustre_idl.h>
-#include <lu_object.h>
+#include <lu_target.h>
#include <lu_ref.h>
#include <lustre_lib.h>
#include <lustre_export.h>
/* hold common fields for "target" device */
struct obd_device_target {
struct super_block *obt_sb;
+ /** last_rcvd file */
+ struct file *obt_rcvd_filp;
+ /** server data in last_rcvd file */
+ struct lr_server_data *obt_lsd;
+ /** Lock protecting client bitmap */
+ spinlock_t obt_client_bitmap_lock;
+ /** Bitmap of known clients */
+ unsigned long *obt_client_bitmap;
+ /** Server last transaction number */
+ __u64 obt_last_transno;
+ /** Lock protecting last transaction number */
+ spinlock_t obt_translock;
+ /** Number of mounts */
+ __u64 obt_mount_count;
atomic_t obt_quotachecking;
struct lustre_quota_ctxt obt_qctxt;
lustre_quota_version_t obt_qfmt;
struct filter_obd {
/* NB this field MUST be first */
struct obd_device_target fo_obt;
+ struct lu_target fo_lut;
const char *fo_fstype;
struct vfsmount *fo_vfsmnt;
spinlock_t fo_objidlock; /* protect fo_lastobjid */
- spinlock_t fo_translock; /* protect fsd_last_transno */
- struct file *fo_rcvd_filp;
struct file *fo_health_check_filp;
- struct lr_server_data *fo_fsd;
- unsigned long *fo_last_rcvd_slots;
- __u64 fo_mount_count;
unsigned long fo_destroys_in_progress;
struct semaphore fo_create_locks[FILTER_SUBDIR_COUNT];
int fo_sec_level;
};
+#define fo_translock fo_obt.obt_translock
+#define fo_rcvd_filp fo_obt.obt_rcvd_filp
+#define fo_fsd fo_obt.obt_lsd
+#define fo_last_rcvd_slots fo_obt.obt_client_bitmap
+#define fo_mount_count fo_obt.obt_mount_count
+
struct timeout_item {
enum timeout_event ti_event;
cfs_time_t ti_timeout;
struct list_head ti_obd_list;
struct list_head ti_chain;
};
+
#define OSC_MAX_RIF_DEFAULT 8
#define OSC_MAX_RIF_MAX 256
#define OSC_MAX_DIRTY_DEFAULT (OSC_MAX_RIF_DEFAULT * 4)
cfs_dentry_t *mds_fid_de;
int mds_max_mdsize;
int mds_max_cookiesize;
- struct file *mds_rcvd_filp;
- spinlock_t mds_transno_lock;
- __u64 mds_last_transno;
- __u64 mds_mount_count;
__u64 mds_io_epoch;
unsigned long mds_atime_diff;
struct semaphore mds_epoch_sem;
struct ll_fid mds_rootfid;
- struct lr_server_data *mds_server_data;
cfs_dentry_t *mds_pending_dir;
cfs_dentry_t *mds_logs_dir;
cfs_dentry_t *mds_objects_dir;
__u32 mds_lov_objid_lastidx;
struct file *mds_health_check_filp;
- unsigned long *mds_client_bitmap;
-// struct upcall_cache *mds_group_hash;
struct lustre_quota_info mds_quota_info;
struct semaphore mds_qonoff_sem;
struct rw_semaphore mds_notify_lock;
};
+#define mds_transno_lock mds_obt.obt_translock
+#define mds_rcvd_filp mds_obt.obt_rcvd_filp
+#define mds_server_data mds_obt.obt_lsd
+#define mds_client_bitmap mds_obt.obt_client_bitmap
+#define mds_mount_count mds_obt.obt_mount_count
+#define mds_last_transno mds_obt.obt_last_transno
+
/* lov objid */
extern __u32 mds_max_ost_index;
/* initial thread handling transaction */
struct ptlrpc_thread * oti_thread;
__u32 oti_conn_cnt;
+ /** VBR: versions */
+ __u64 oti_pre_version;
struct obd_uuid *oti_ost_uuid;
};
return;
oti->oti_xid = req->rq_xid;
+ /** VBR: take versions from request */
+ if (req->rq_reqmsg != NULL &&
+ lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ __u64 *pre_version = lustre_msg_get_versions(req->rq_reqmsg);
+ oti->oti_pre_version = pre_version ? pre_version[0] : 0;
+ oti->oti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+ }
+ /** called from mds_create_objects */
if (req->rq_repmsg != NULL)
oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
oti->oti_thread = req->rq_svc_thread;
unsigned long obd_attached:1, /* finished attach */
obd_set_up:1, /* finished setup */
obd_recovering:1, /* there are recoverable clients */
- obd_abort_recovery:1,/* somebody ioctl'ed us to abort */
+ obd_abort_recovery:1,/* recovery expired */
+ obd_version_recov:1, /* obd uses version checking */
obd_replayable:1, /* recovery is enabled; inform clients */
obd_no_transno:1, /* no committed-transno notification */
obd_no_recov:1, /* fail instead of retry messages */
atomic_t obd_refcount;
cfs_waitq_t obd_refcount_waitq;
struct list_head obd_exports;
+ struct list_head obd_delayed_exports;
int obd_num_exports;
spinlock_t obd_nid_lock;
struct ldlm_namespace *obd_namespace;
int obd_max_recoverable_clients;
int obd_connected_clients;
int obd_recoverable_clients;
+ int obd_delayed_clients;
spinlock_t obd_processing_task_lock; /* BH lock (timer) */
__u64 obd_next_recovery_transno;
int obd_replayed_requests;
int obd_requests_queued_for_recovery;
cfs_waitq_t obd_next_transno_waitq;
- struct list_head obd_uncommitted_replies;
- spinlock_t obd_uncommitted_replies_lock;
cfs_timer_t obd_recovery_timer;
time_t obd_recovery_start; /* seconds */
time_t obd_recovery_end; /* seconds, for lprocfs_status */
#define OBD_CALC_STRIPE_END 2
static inline void obd_transno_commit_cb(struct obd_device *obd, __u64 transno,
- int error)
+ struct obd_export *exp, int error)
{
if (error) {
CERROR("%s: transno "LPU64" commit error: %d\n",
obd->obd_name, transno, error);
return;
}
- if (transno > obd->obd_last_committed) {
- CDEBUG(D_INFO, "%s: transno "LPD64" committed\n",
+ if (exp && transno > exp->exp_last_committed) {
+ CDEBUG(D_HA, "%s: transno "LPU64" committed\n",
obd->obd_name, transno);
- obd->obd_last_committed = transno;
- ptlrpc_commit_replies (obd);
+ exp->exp_last_committed = transno;
+ ptlrpc_commit_replies(exp);
} else {
- CDEBUG(D_INFO, "%s: transno "LPD64" committed\n",
+ CDEBUG(D_INFO, "%s: transno "LPU64" committed\n",
obd->obd_name, transno);
}
+ if (transno > obd->obd_last_committed)
+ obd->obd_last_committed = transno;
}
static inline void init_obd_quota_ops(quota_interface_t *interface,
#define OBD_FAIL_TGT_REPLAY_DROP 0x707
#define OBD_FAIL_TGT_FAKE_EXP 0x708
#define OBD_FAIL_TGT_REPLAY_DELAY 0x709
+#define OBD_FAIL_TGT_LAST_REPLAY 0x710
#define OBD_FAIL_MDC_REVALIDATE_PAUSE 0x800
#define OBD_FAIL_MDC_ENQUEUE_PAUSE 0x801
RETURN(rc);
}
-static void destroy_import(struct obd_import *imp)
+void client_destroy_import(struct obd_import *imp)
{
/* drop security policy instance after all rpc finished/aborted
* to let all busy contexts be released. */
ptlrpc_free_rq_pool(imp->imp_rq_pool);
imp->imp_rq_pool = NULL;
}
- destroy_import(imp);
+ client_destroy_import(imp);
cli->cl_import = NULL;
EXIT;
atomic_inc(&target->obd_lock_replay_clients);
if (target->obd_connected_clients ==
target->obd_max_recoverable_clients)
- wake_up(&target->obd_next_transno_waitq);
+ cfs_waitq_signal(&target->obd_next_transno_waitq);
}
spin_unlock_bh(&target->obd_processing_task_lock);
tmp = req_capsule_client_get(&req->rq_pill, &RMF_CONN);
*/
sptlrpc_import_inval_all_ctx(export->exp_imp_reverse);
- destroy_import(export->exp_imp_reverse);
+ client_destroy_import(export->exp_imp_reverse);
}
/* for the rest part, we return -ENOTCONN in case of errors
/* exports created from last_rcvd data, and "fake"
exports created by lctl don't have an import */
if (exp->exp_imp_reverse != NULL)
- destroy_import(exp->exp_imp_reverse);
+ client_destroy_import(exp->exp_imp_reverse);
/* We cancel locks at disconnect time, but this will catch any locks
* granted in a race with recovery-induced disconnect. */
list_empty(&obd->obd_final_req_queue)) {
obd->obd_processing_task = 0;
} else {
- CERROR("%s: Recovery queues ( %s%s%s) are empty\n",
+ CERROR("%s: Recovery queues ( %s%s%s) are not empty\n",
obd->obd_name,
list_empty(&obd->obd_req_replay_queue) ? "" : "req ",
list_empty(&obd->obd_lock_replay_queue) ? "" : "lock ",
* to replay requests that demand on already committed ones
* also, we can replay first non-committed transation */
LASSERT(req_transno != 0);
- if (req_transno == obd->obd_last_committed + 1) {
+ if (obd->obd_version_recov ||
+ req_transno == obd->obd_last_committed + 1) {
obd->obd_next_recovery_transno = req_transno;
} else if (req_transno > obd->obd_last_committed) {
/* can't continue recovery: have no needed transno */
req = list_entry(obd->obd_final_req_queue.next,
struct ptlrpc_request, rq_list);
list_del_init(&req->rq_list);
+ if (req->rq_export->exp_in_recovery) {
+ spin_lock(&req->rq_export->exp_lock);
+ req->rq_export->exp_in_recovery = 0;
+ spin_unlock(&req->rq_export->exp_lock);
+ }
} else {
req = NULL;
}
return req;
}
+static inline int req_vbr_done(struct obd_export *exp)
+{
+ return (exp->exp_vbr_failed == 0);
+}
+
static inline int req_replay_done(struct obd_export *exp)
{
return (exp->exp_req_replay_needed == 0);
static int check_for_clients(struct obd_device *obd)
{
- if (obd->obd_abort_recovery)
+ if (obd->obd_abort_recovery || obd->obd_version_recov)
return 1;
LASSERT(obd->obd_connected_clients <= obd->obd_max_recoverable_clients);
if (obd->obd_no_conn == 0 &&
static int target_recovery_thread(void *arg)
{
- struct obd_device *obd = arg;
+ struct lu_target *lut = arg;
+ struct obd_device *obd = lut->lut_obd;
struct ptlrpc_request *req;
struct target_recovery_data *trd = &obd->obd_recovery_data;
struct l_wait_info lwi = { 0 };
env.le_ctx.lc_thread = thread;
CERROR("%s: started recovery thread pid %d\n", obd->obd_name,
- current->pid);
- trd->trd_processing_task = current->pid;
+ cfs_curproc_pid());
+ trd->trd_processing_task = cfs_curproc_pid();
obd->obd_recovering = 1;
complete(&trd->trd_starting);
spin_unlock_bh(&obd->obd_processing_task_lock);
/* If some clients haven't connected in time, evict them */
- if (obd->obd_abort_recovery) {
+ if (obd->obd_connected_clients < obd->obd_max_recoverable_clients) {
CWARN("Some clients haven't connect in time (%d/%d),"
"evict them\n", obd->obd_connected_clients,
obd->obd_max_recoverable_clients);
- obd->obd_abort_recovery = obd->obd_stopping;
- class_disconnect_stale_exports(obd, connect_done,
- exp_flags_from_obd(obd) |
+ class_disconnect_stale_exports(obd, connect_done,
+ exp_flags_from_obd(obd) |
OBD_OPT_ABORT_RECOV);
}
+
/* next stage: replay requests */
delta = jiffies;
obd->obd_req_replaying = 1;
CDEBUG(D_INFO, "1: request replay stage - %d clients from t"LPU64"\n",
- atomic_read(&obd->obd_req_replay_clients),
- obd->obd_next_recovery_transno);
+ atomic_read(&obd->obd_req_replay_clients),
+ obd->obd_next_recovery_transno);
resume_recovery_timer(obd);
while ((req = target_next_replay_req(obd))) {
- LASSERT(trd->trd_processing_task == current->pid);
+ LASSERT(trd->trd_processing_task == cfs_curproc_pid());
DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s",
lustre_msg_get_transno(req->rq_reqmsg),
libcfs_nid2str(req->rq_peer.nid));
/* If some clients haven't replayed requests in time, evict them */
if (obd->obd_abort_recovery) {
- CDEBUG(D_ERROR, "req replay timed out, aborting ...\n");
- obd->obd_abort_recovery = obd->obd_stopping;
- class_disconnect_stale_exports(obd, req_replay_done,
- exp_flags_from_obd(obd) |
+ CDEBUG(D_WARNING, "req replay is aborted\n");
+ class_disconnect_stale_exports(obd, req_replay_done,
+ exp_flags_from_obd(obd) |
OBD_OPT_ABORT_RECOV);
abort_req_replay_queue(obd);
}
+ LASSERT(list_empty(&obd->obd_req_replay_queue));
/* The second stage: replay locks */
CDEBUG(D_INFO, "2: lock replay stage - %d clients\n",
atomic_read(&obd->obd_lock_replay_clients));
resume_recovery_timer(obd);
while ((req = target_next_replay_lock(obd))) {
- LASSERT(trd->trd_processing_task == current->pid);
- DEBUG_REQ(D_HA|D_WARNING, req, "processing lock from %s: ",
+ LASSERT(trd->trd_processing_task == cfs_curproc_pid());
+ DEBUG_REQ(D_HA, req, "processing lock from %s: ",
libcfs_nid2str(req->rq_peer.nid));
handle_recovery_req(thread, req,
trd->trd_recovery_handler);
/* If some clients haven't replayed requests in time, evict them */
if (obd->obd_abort_recovery) {
int stale;
- CERROR("lock replay timed out, aborting ...\n");
- obd->obd_abort_recovery = obd->obd_stopping;
- stale = class_disconnect_stale_exports(obd, lock_replay_done,
- exp_flags_from_obd(obd) |
+ CERROR("lock replay is aborted\n");
+ stale = class_disconnect_stale_exports(obd, lock_replay_done,
+ exp_flags_from_obd(obd) |
OBD_OPT_ABORT_RECOV);
abort_lock_replay_queue(obd);
}
+ LASSERT(list_empty(&obd->obd_lock_replay_queue));
+ /* The third stage: reply on final pings */
+ CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n");
+ /** Update server last boot epoch */
+ lut_boot_epoch_update(lut);
/* We drop recoverying flag to forward all new requests
* to regular mds_handle() since now */
spin_lock_bh(&obd->obd_processing_task_lock);
obd->obd_recovering = obd->obd_abort_recovery = 0;
spin_unlock_bh(&obd->obd_processing_task_lock);
- /* The third stage: reply on final pings */
- CDEBUG(D_INFO, "3: final stage - process recovery completion pings\n");
while ((req = target_next_final_ping(obd))) {
- LASSERT(trd->trd_processing_task == current->pid);
+ LASSERT(trd->trd_processing_task == cfs_curproc_pid());
DEBUG_REQ(D_HA, req, "processing final ping from %s: ",
libcfs_nid2str(req->rq_peer.nid));
handle_recovery_req(thread, req,
trd->trd_recovery_handler);
}
+ /* evict exports failed VBR */
+ class_disconnect_stale_exports(obd, req_vbr_done,
+ exp_flags_from_obd(obd) |
+ OBD_OPT_ABORT_RECOV);
delta = (jiffies - delta) / HZ;
CDEBUG(D_INFO,"4: recovery completed in %lus - %d/%d reqs/locks\n",
RETURN(rc);
}
-int target_start_recovery_thread(struct obd_device *obd, svc_handler_t handler)
+static int target_start_recovery_thread(struct lu_target *lut,
+ svc_handler_t handler)
{
+ struct obd_device *obd = lut->lut_obd;
int rc = 0;
struct target_recovery_data *trd = &obd->obd_recovery_data;
init_completion(&trd->trd_finishing);
trd->trd_recovery_handler = handler;
- if (kernel_thread(target_recovery_thread, obd, 0) > 0) {
+ if (kernel_thread(target_recovery_thread, lut, 0) > 0) {
wait_for_completion(&trd->trd_starting);
LASSERT(obd->obd_recovering != 0);
} else
struct target_recovery_data *trd = &obd->obd_recovery_data;
CERROR("%s: Aborting recovery\n", obd->obd_name);
obd->obd_abort_recovery = 1;
- wake_up(&obd->obd_next_transno_waitq);
+ cfs_waitq_signal(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
wait_for_completion(&trd->trd_finishing);
} else {
obd->obd_name, obd->obd_recoverable_clients,
cfs_time_current_sec()- obd->obd_recovery_start,
obd->obd_connected_clients);
+
spin_lock_bh(&obd->obd_processing_task_lock);
- if (obd->obd_recovering)
- obd->obd_abort_recovery = 1;
+ obd->obd_version_recov = 1;
+ CDEBUG(D_INFO, "VBR is used for %d clients from t"LPU64"\n",
+ atomic_read(&obd->obd_req_replay_clients),
+ obd->obd_next_recovery_transno);
cfs_waitq_signal(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
}
-void target_recovery_init(struct obd_device *obd, svc_handler_t handler)
+void target_recovery_init(struct lu_target *lut, svc_handler_t handler)
{
+ struct obd_device *obd = lut->lut_obd;
if (obd->obd_max_recoverable_clients == 0)
return;
CWARN("RECOVERY: service %s, %d recoverable clients, "
"last_transno "LPU64"\n", obd->obd_name,
obd->obd_max_recoverable_clients, obd->obd_last_committed);
+ LASSERT(obd->obd_stopping == 0);
obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
obd->obd_recovery_start = 0;
obd->obd_recovery_end = 0;
/* bz13079: this should be set to desired value for ost but not for mds */
obd->obd_recovery_max_time = OBD_RECOVERY_MAX_TIME;
cfs_timer_init(&obd->obd_recovery_timer, target_recovery_expired, obd);
- target_start_recovery_thread(obd, handler);
+ target_start_recovery_thread(lut, handler);
}
EXPORT_SYMBOL(target_recovery_init);
#endif
-int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req)
+static int target_process_req_flags(struct obd_device *obd,
+ struct ptlrpc_request *req)
{
struct obd_export *exp = req->rq_export;
LASSERT(exp != NULL);
obd->obd_recoverable_clients--;
if (atomic_read(&obd->obd_req_replay_clients) == 0)
CDEBUG(D_HA, "all clients have replayed reqs\n");
- wake_up(&obd->obd_next_transno_waitq);
}
spin_unlock_bh(&obd->obd_processing_task_lock);
}
atomic_dec(&obd->obd_lock_replay_clients);
if (atomic_read(&obd->obd_lock_replay_clients) == 0)
CDEBUG(D_HA, "all clients have replayed locks\n");
- wake_up(&obd->obd_next_transno_waitq);
}
spin_unlock_bh(&obd->obd_processing_task_lock);
}
struct list_head *tmp;
int inserted = 0;
__u64 transno = lustre_msg_get_transno(req->rq_reqmsg);
-
ENTRY;
if (obd->obd_recovery_data.trd_processing_task == cfs_curproc_pid()) {
RETURN(-ENOMEM);
DEBUG_REQ(D_HA, req, "queue final req");
spin_lock_bh(&obd->obd_processing_task_lock);
+ cfs_waitq_signal(&obd->obd_next_transno_waitq);
if (obd->obd_recovering)
list_add_tail(&req->rq_list, &obd->obd_final_req_queue);
else {
RETURN(-ENOMEM);
DEBUG_REQ(D_HA, req, "queue lock replay req");
spin_lock_bh(&obd->obd_processing_task_lock);
+ cfs_waitq_signal(&obd->obd_next_transno_waitq);
LASSERT(obd->obd_recovering);
/* usually due to recovery abort */
if (!req->rq_export->exp_in_recovery) {
LASSERT(req->rq_export->exp_lock_replay_needed);
list_add_tail(&req->rq_list, &obd->obd_lock_replay_queue);
spin_unlock_bh(&obd->obd_processing_task_lock);
- wake_up(&obd->obd_next_transno_waitq);
RETURN(0);
}
list_add_tail(&req->rq_list, &obd->obd_req_replay_queue);
obd->obd_requests_queued_for_recovery++;
- wake_up(&obd->obd_next_transno_waitq);
+ cfs_waitq_signal(&obd->obd_next_transno_waitq);
spin_unlock_bh(&obd->obd_processing_task_lock);
RETURN(0);
}
rs->rs_transno = req->rq_transno;
rs->rs_export = exp;
- spin_lock(&obd->obd_uncommitted_replies_lock);
-
+ spin_lock(&exp->exp_uncommitted_replies_lock);
CDEBUG(D_NET, "rs transno = "LPU64", last committed = "LPU64"\n",
- rs->rs_transno, obd->obd_last_committed);
- if (rs->rs_transno > obd->obd_last_committed) {
+ rs->rs_transno, exp->exp_last_committed);
+ if (rs->rs_transno > exp->exp_last_committed) {
/* not committed already */
- list_add_tail (&rs->rs_obd_list,
- &obd->obd_uncommitted_replies);
+ list_add_tail(&rs->rs_obd_list,
+ &exp->exp_uncommitted_replies);
}
+ spin_unlock (&exp->exp_uncommitted_replies_lock);
- spin_unlock (&obd->obd_uncommitted_replies_lock);
- spin_lock (&exp->exp_lock);
-
- list_add_tail (&rs->rs_exp_list, &exp->exp_outstanding_replies);
-
+ spin_lock(&exp->exp_lock);
+ list_add_tail(&rs->rs_exp_list, &exp->exp_outstanding_replies);
spin_unlock(&exp->exp_lock);
netrc = target_send_reply_msg (req, rc, fail_id);
}
spin_lock(&rs->rs_lock);
- if (rs->rs_transno <= obd->obd_last_committed ||
+ if (rs->rs_transno <= exp->exp_last_committed ||
(!rs->rs_on_net && !rs->rs_no_ack) ||
list_empty(&rs->rs_exp_list) || /* completed already */
list_empty(&rs->rs_obd_list)) {
void target_committed_to_req(struct ptlrpc_request *req)
{
- struct obd_device *obd;
-
- if (req == NULL || req->rq_export == NULL)
- return;
-
- obd = req->rq_export->exp_obd;
- if (obd == NULL)
- return;
+ struct obd_export *exp = req->rq_export;
- if (!obd->obd_no_transno && req->rq_repmsg != NULL)
+ if (!exp->exp_obd->obd_no_transno && req->rq_repmsg != NULL)
lustre_msg_set_last_committed(req->rq_repmsg,
- obd->obd_last_committed);
+ exp->exp_last_committed);
else
DEBUG_REQ(D_IOCTL, req, "not sending last_committed update (%d/"
- "%d)", obd->obd_no_transno, req->rq_repmsg == NULL);
+ "%d)", exp->exp_obd->obd_no_transno,
+ req->rq_repmsg == NULL);
CDEBUG(D_INFO, "last_committed "LPU64", transno "LPU64", xid "LPU64"\n",
- obd->obd_last_committed, req->rq_transno, req->rq_xid);
+ exp->exp_last_committed, req->rq_transno, req->rq_xid);
}
-
EXPORT_SYMBOL(target_committed_to_req);
int target_handle_qc_callback(struct ptlrpc_request *req)
EXPORT_SYMBOL(client_obd_cleanup);
EXPORT_SYMBOL(client_connect_import);
EXPORT_SYMBOL(client_disconnect_export);
-EXPORT_SYMBOL(target_start_recovery_thread);
EXPORT_SYMBOL(target_stop_recovery_thread);
EXPORT_SYMBOL(target_handle_connect);
EXPORT_SYMBOL(target_cleanup_recovery);
GOTO(out_cleanup, rc = -ENOMEM);
ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_FID |
- OBD_CONNECT_AT;
+ OBD_CONNECT_AT | OBD_CONNECT_VBR;
#ifdef LIBLUSTRE_POSIX_ACL
ocd->ocd_connect_flags |= OBD_CONNECT_ACL;
#endif
sizeof(async), &async, NULL);
ocd.ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_VERSION |
- OBD_CONNECT_FID | OBD_CONNECT_AT;
+ OBD_CONNECT_FID | OBD_CONNECT_AT |
+ OBD_CONNECT_VBR;
#ifdef LIBLUSTRE_POSIX_ACL
ocd.ocd_connect_flags |= OBD_CONNECT_ACL;
#endif
OBD_CONNECT_VERSION | OBD_CONNECT_MDS_CAPA |
OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET|
OBD_CONNECT_FID | OBD_CONNECT_AT |
- OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT;
+ OBD_CONNECT_LOV_V3 | OBD_CONNECT_RMT_CLIENT |
+ OBD_CONNECT_VBR;
#ifdef HAVE_LRU_RESIZE_SUPPORT
if (sbi->ll_flags & LL_SBI_LRU_RESIZE)
OBD_CONNECT_CANCELSET | OBD_CONNECT_FID |
OBD_CONNECT_SRVLOCK | OBD_CONNECT_TRUNCLOCK|
OBD_CONNECT_AT | OBD_CONNECT_RMT_CLIENT |
- OBD_CONNECT_OSS_CAPA | OBD_CONNECT_GRANT_SHRINK;
+ OBD_CONNECT_OSS_CAPA | OBD_CONNECT_VBR|
+ OBD_CONNECT_GRANT_SHRINK;
if (!OBD_FAIL_CHECK(OBD_FAIL_OSC_CONNECT_CKSUM)) {
/* OBD_CONNECT_CKSUM should always be set, even if checksums are
*/
static __u64 fsfilt_ext3_get_version(struct inode *inode)
{
+ CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
+ EXT3_I(inode)->i_fs_version, inode->i_ino);
return EXT3_I(inode)->i_fs_version;
}
{
__u64 old_version = EXT3_I(inode)->i_fs_version;
+ CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+ new_version, old_version, inode->i_ino);
(EXT3_I(inode))->i_fs_version = new_version;
+ /* version is set after all inode operations are finished, so we should
+ * mark it dirty here */
+ inode->i_sb->s_op->dirty_inode(inode);
return old_version;
}
mdd->mdd_txn_cb.dtc_txn_stop = mdd_txn_stop_cb;
mdd->mdd_txn_cb.dtc_txn_commit = mdd_txn_commit_cb;
mdd->mdd_txn_cb.dtc_cookie = mdd;
+ mdd->mdd_txn_cb.dtc_tag = LCT_MD_THREAD;
CFS_INIT_LIST_HEAD(&mdd->mdd_txn_cb.dtc_linkage);
mdd->mdd_atime_diff = MAX_ATIME_DIFF;
return next->do_ops->do_object_sync(env, next);
}
+static dt_obj_version_t mdd_version_get(const struct lu_env *env,
+ struct md_object *obj)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+ LASSERT(mdd_object_exists(mdd_obj));
+ return do_version_get(env, mdd_object_child(mdd_obj));
+}
+
+static void mdd_version_set(const struct lu_env *env, struct md_object *obj,
+ dt_obj_version_t version)
+{
+ struct mdd_object *mdd_obj = md2mdd_obj(obj);
+
+ LASSERT(mdd_object_exists(mdd_obj));
+ return do_version_set(env, mdd_object_child(mdd_obj), version);
+}
+
const struct md_object_operations mdd_obj_ops = {
.moo_permission = mdd_permission,
.moo_attr_get = mdd_attr_get,
.moo_readlink = mdd_readlink,
.moo_capa_get = mdd_capa_get,
.moo_object_sync = mdd_object_sync,
+ .moo_version_get = mdd_version_get,
+ .moo_version_set = mdd_version_set,
.moo_path = mdd_path,
};
for (i = 0; i < 2; i++) {
lck_cpu_to_le(tmp, &keys[i]);
- rc = mdt_record_write(env, mdt->mdt_ck_obj,
- mdt_buf_const(env, tmp, sizeof(*tmp)),
- &off, th);
+ rc = dt_record_write(env, mdt->mdt_ck_obj,
+ mdt_buf_const(env, tmp, sizeof(*tmp)),
+ &off, th);
if (rc)
break;
}
tmp = &mti->mti_capa_key;
for (i = 0; i < 2; i++) {
- rc = mdt_record_read(env, mdt->mdt_ck_obj,
- mdt_buf(env, tmp, sizeof(*tmp)), &off);
+ rc = dt_record_read(env, mdt->mdt_ck_obj,
+ mdt_buf(env, tmp, sizeof(*tmp)), &off);
if (rc)
return rc;
info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+ info->mti_mos[0] = NULL;
+ info->mti_mos[1] = NULL;
+ info->mti_mos[2] = NULL;
+ info->mti_mos[3] = NULL;
memset(&info->mti_attr, 0, sizeof(info->mti_attr));
info->mti_body = NULL;
rep->lock_policy_res2 = clear_serious(rc);
lhc->mlh_reg_lh.cookie = 0ull;
- if (rc == -ENOTCONN || rc == -ENODEV) {
+ if (rc == -ENOTCONN || rc == -ENODEV ||
+ rc == -EOVERFLOW) { /**< if VBR failure then return error */
/*
* If it is the disconnect error (ENODEV & ENOCONN), the error
* will be returned by rq_status, and client at ptlrpc layer
int waited = 0;
ENTRY;
+ target_recovery_fini(obd);
/* At this point, obd exports might still be on the "obd_zombie_exports"
* list, and obd_zombie_impexp_thread() is trying to destroy them.
* We wait a little bit until all exports (except the self-export)
ping_evictor_stop();
- target_recovery_fini(obd);
mdt_stop_ptlrpc_service(m);
mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
mdt_obd_llog_cleanup(obd);
#ifdef HAVE_QUOTA_SUPPORT
next->md_ops->mdo_quota.mqo_cleanup(env, next);
#endif
+ lut_fini(env, &m->mdt_lut);
mdt_fs_cleanup(env, m);
upcall_cache_cleanup(m->mdt_identity_cache);
m->mdt_identity_cache = NULL;
GOTO(err_fini_proc, rc);
}
- rc = mdt_fld_init(env, obd->obd_name, m);
+ rc = lut_init(env, &m->mdt_lut, obd, m->mdt_bottom);
if (rc)
GOTO(err_fini_stack, rc);
+ rc = mdt_fld_init(env, obd->obd_name, m);
+ if (rc)
+ GOTO(err_lut, rc);
+
rc = mdt_seq_init(env, obd->obd_name, m);
if (rc)
GOTO(err_fini_fld, rc);
server_put_mount_2(dev, lmi->lmi_mnt);
lmi = NULL;
- target_recovery_init(obd, mdt_recovery_handle);
+ target_recovery_init(&m->mdt_lut, mdt_recovery_handle);
rc = mdt_start_ptlrpc_service(m);
if (rc)
mdt_seq_fini(env, m);
err_fini_fld:
mdt_fld_fini(env, m);
+err_lut:
+ lut_fini(env, &m->mdt_lut);
err_fini_stack:
mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
err_fini_proc:
* struct lustre_handle
*/
#include <lustre/lustre_idl.h>
+#include <lustre_disk.h>
+#include <lu_target.h>
#include <md_object.h>
-#include <dt_object.h>
#include <lustre_fid.h>
#include <lustre_fld.h>
#include <lustre_req_layout.h>
-/* LR_CLIENT_SIZE, etc. */
-#include <lustre_disk.h>
#include <lustre_sec.h>
#include <lvfs.h>
#include <lustre_idmap.h>
#include <lustre_eacl.h>
#include <lustre_fsfilt.h>
-static inline __u64 lcd_last_transno(struct lsd_client_data *lcd)
-{
- return max(lcd->lcd_last_transno, lcd->lcd_last_close_transno);
-}
-
-static inline __u64 lcd_last_xid(struct lsd_client_data *lcd)
-{
- return max(lcd->lcd_last_xid, lcd->lcd_last_close_xid);
-}
-
/* check if request's xid is equal to last one or not*/
static inline int req_xid_is_last(struct ptlrpc_request *req)
{
/* underlying device */
struct md_device *mdt_child;
struct dt_device *mdt_bottom;
+ /** target device */
+ struct lu_target mdt_lut;
/*
* Options bit-fields.
*/
spinlock_t mdt_ioepoch_lock;
__u64 mdt_ioepoch;
- /* Transaction related stuff here */
- spinlock_t mdt_transno_lock;
- __u64 mdt_last_transno;
-
/* transaction callbacks */
struct dt_txn_callback mdt_txn_cb;
- /* last_rcvd file */
- struct dt_object *mdt_last_rcvd;
/* these values should be updated from lov if necessary.
* or should be placed somewhere else. */
int mdt_max_mdsize;
int mdt_max_cookiesize;
- __u64 mdt_mount_count;
-
- /* last_rcvd data */
- struct lr_server_data mdt_lsd;
- spinlock_t mdt_client_bitmap_lock;
- unsigned long mdt_client_bitmap[(LR_MAX_CLIENTS >> 3) / sizeof(long)];
struct upcall_cache *mdt_identity_cache;
int mdt_sec_level;
};
+#define mdt_transno_lock mdt_lut.lut_translock
+#define mdt_last_transno mdt_lut.lut_last_transno
+#define mdt_last_rcvd mdt_lut.lut_last_rcvd
+#define mdt_mount_count mdt_lut.lut_mount_count
+#define mdt_lsd mdt_lut.lut_lsd
+#define mdt_client_bitmap_lock mdt_lut.lut_client_bitmap_lock
+#define mdt_client_bitmap mdt_lut.lut_client_bitmap
+
#define MDT_SERVICE_WATCHDOG_FACTOR (2000)
#define MDT_ROCOMPAT_SUPP (OBD_ROCOMPAT_LOVOBJID)
#define MDT_INCOMPAT_SUPP (OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR)
*/
struct mdt_reint_record mti_rr;
+ /** md objects included in operation */
+ struct mdt_object *mti_mos[PTLRPC_NUM_VERSIONS];
+
/*
* Operation specification (currently create and lookup)
*/
struct md_attr mti_tmp_attr;
};
+#define mti_parent mti_mos[0]
+#define mti_child mti_mos[1]
+#define mti_parent1 mti_mos[2]
+#define mti_child1 mti_mos[3]
+
typedef void (*mdt_cb_t)(const struct mdt_device *mdt, __u64 transno,
void *data, int err);
struct mdt_commit_cb {
MDT_TXN_LAST_RCVD_WRITE_OP,
};
-
/*
* Info allocated per-transaction.
*/
struct mdt_txn_info {
__u64 txi_transno;
unsigned int txi_cb_count;
- struct mdt_commit_cb txi_cb[MDT_MAX_COMMIT_CB];
+ struct lut_commit_cb txi_cb[MDT_MAX_COMMIT_CB];
};
extern struct lu_context_key mdt_txn_key;
static inline void mdt_trans_add_cb(const struct thandle *th,
- mdt_cb_t cb_func, void *cb_data)
+ lut_cb_t cb_func, void *cb_data)
{
struct mdt_txn_info *txi;
LASSERT(txi->txi_cb_count < ARRAY_SIZE(txi->txi_cb));
/* add new callback */
- txi->txi_cb[txi->txi_cb_count].mdt_cb_func = cb_func;
- txi->txi_cb[txi->txi_cb_count].mdt_cb_data = cb_data;
+ txi->txi_cb[txi->txi_cb_count].lut_cb_func = cb_func;
+ txi->txi_cb[txi->txi_cb_count].lut_cb_data = cb_data;
txi->txi_cb_count++;
}
struct mdt_lock_handle *lhc);
extern void target_recovery_fini(struct obd_device *obd);
-extern void target_recovery_init(struct obd_device *obd,
+extern void target_recovery_init(struct lu_target *lut,
svc_handler_t handler);
int mdt_fs_setup(const struct lu_env *, struct mdt_device *,
struct obd_device *, struct lustre_sb_info *lsi);
int mdt_init_ucred(struct mdt_thread_info *, struct mdt_body *);
int mdt_init_ucred_reint(struct mdt_thread_info *);
void mdt_exit_ucred(struct mdt_thread_info *);
+int mdt_version_get_check(struct mdt_thread_info *, int);
/* mdt_idmap.c */
int mdt_init_sec_level(struct mdt_thread_info *);
/* Not found and with MDS_OPEN_CREAT: let's create it. */
mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
+ info->mti_mos[0] = parent;
+ info->mti_mos[1] = child;
+ result = mdt_version_get_check(info, 0);
+ if (result)
+ GOTO(out_child, result);
+
/* Let lower layers know what is lock mode on directory. */
info->mti_spec.sp_cr_mode =
mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
return buf;
}
-int mdt_record_read(const struct lu_env *env,
- struct dt_object *dt, struct lu_buf *buf, loff_t *pos)
-{
- int rc;
-
- LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
-
- rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
-
- if (rc == buf->lb_len)
- rc = 0;
- else if (rc >= 0)
- rc = -EFAULT;
- return rc;
-}
-
-int mdt_record_write(const struct lu_env *env,
- struct dt_object *dt, const struct lu_buf *buf,
- loff_t *pos, struct thandle *th)
-{
- int rc;
-
- LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
- LASSERT(th != NULL);
- rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
- if (rc == buf->lb_len)
- rc = 0;
- else if (rc >= 0)
- rc = -EFAULT;
- return rc;
-}
-
static inline int mdt_trans_credit_get(const struct lu_env *env,
struct mdt_device *mdt,
enum mdt_txn_op op)
mdt->mdt_bottom->dd_ops->dt_trans_stop(env, th);
}
-/* last_rcvd handling */
-static inline void lsd_le_to_cpu(struct lr_server_data *buf,
- struct lr_server_data *lsd)
-{
- memcpy(lsd->lsd_uuid, buf->lsd_uuid, sizeof (lsd->lsd_uuid));
- lsd->lsd_last_transno = le64_to_cpu(buf->lsd_last_transno);
- lsd->lsd_mount_count = le64_to_cpu(buf->lsd_mount_count);
- lsd->lsd_feature_compat = le32_to_cpu(buf->lsd_feature_compat);
- lsd->lsd_feature_rocompat = le32_to_cpu(buf->lsd_feature_rocompat);
- lsd->lsd_feature_incompat = le32_to_cpu(buf->lsd_feature_incompat);
- lsd->lsd_server_size = le32_to_cpu(buf->lsd_server_size);
- lsd->lsd_client_start = le32_to_cpu(buf->lsd_client_start);
- lsd->lsd_client_size = le16_to_cpu(buf->lsd_client_size);
-}
-
-static inline void lsd_cpu_to_le(struct lr_server_data *lsd,
- struct lr_server_data *buf)
-{
- memcpy(buf->lsd_uuid, lsd->lsd_uuid, sizeof (lsd->lsd_uuid));
- buf->lsd_last_transno = cpu_to_le64(lsd->lsd_last_transno);
- buf->lsd_mount_count = cpu_to_le64(lsd->lsd_mount_count);
- buf->lsd_feature_compat = cpu_to_le32(lsd->lsd_feature_compat);
- buf->lsd_feature_rocompat = cpu_to_le32(lsd->lsd_feature_rocompat);
- buf->lsd_feature_incompat = cpu_to_le32(lsd->lsd_feature_incompat);
- buf->lsd_server_size = cpu_to_le32(lsd->lsd_server_size);
- buf->lsd_client_start = cpu_to_le32(lsd->lsd_client_start);
- buf->lsd_client_size = cpu_to_le16(lsd->lsd_client_size);
-}
-
-static inline void lcd_le_to_cpu(struct lsd_client_data *buf,
- struct lsd_client_data *lcd)
-{
- memcpy(lcd->lcd_uuid, buf->lcd_uuid, sizeof (lcd->lcd_uuid));
- lcd->lcd_last_transno = le64_to_cpu(buf->lcd_last_transno);
- lcd->lcd_last_xid = le64_to_cpu(buf->lcd_last_xid);
- lcd->lcd_last_result = le32_to_cpu(buf->lcd_last_result);
- lcd->lcd_last_data = le32_to_cpu(buf->lcd_last_data);
- lcd->lcd_last_close_transno = le64_to_cpu(buf->lcd_last_close_transno);
- lcd->lcd_last_close_xid = le64_to_cpu(buf->lcd_last_close_xid);
- lcd->lcd_last_close_result = le32_to_cpu(buf->lcd_last_close_result);
-}
-
-static inline void lcd_cpu_to_le(struct lsd_client_data *lcd,
- struct lsd_client_data *buf)
-{
- memcpy(buf->lcd_uuid, lcd->lcd_uuid, sizeof (lcd->lcd_uuid));
- buf->lcd_last_transno = cpu_to_le64(lcd->lcd_last_transno);
- buf->lcd_last_xid = cpu_to_le64(lcd->lcd_last_xid);
- buf->lcd_last_result = cpu_to_le32(lcd->lcd_last_result);
- buf->lcd_last_data = cpu_to_le32(lcd->lcd_last_data);
- buf->lcd_last_close_transno = cpu_to_le64(lcd->lcd_last_close_transno);
- buf->lcd_last_close_xid = cpu_to_le64(lcd->lcd_last_close_xid);
- buf->lcd_last_close_result = cpu_to_le32(lcd->lcd_last_close_result);
-}
-
static inline int mdt_last_rcvd_header_read(const struct lu_env *env,
struct mdt_device *mdt)
{
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
mti->mti_off = 0;
- rc = mdt_record_read(env, mdt->mdt_last_rcvd,
- mdt_buf(env, &mti->mti_lsd, sizeof(mti->mti_lsd)),
- &mti->mti_off);
+ rc = dt_record_read(env, mdt->mdt_last_rcvd,
+ mdt_buf(env, &mti->mti_lsd, sizeof(mti->mti_lsd)),
+ &mti->mti_off);
if (rc == 0)
lsd_le_to_cpu(&mti->mti_lsd, &mdt->mdt_lsd);
return rc;
}
-static void mdt_client_cb(const struct mdt_device *mdt, __u64 transno,
- void *data, int err)
-{
- struct obd_device *obd = mdt2obd_dev(mdt);
- target_client_add_cb(obd, transno, data, err);
-}
-
static inline int mdt_last_rcvd_header_write(const struct lu_env *env,
struct mdt_device *mdt,
int need_sync)
lsd_cpu_to_le(&mdt->mdt_lsd, &mti->mti_lsd);
if (need_sync && mti->mti_exp)
- mdt_trans_add_cb(th, mdt_client_cb, mti->mti_exp);
+ mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp);
- rc = mdt_record_write(env, mdt->mdt_last_rcvd,
- mdt_buf_const(env, &mti->mti_lsd,
- sizeof(mti->mti_lsd)),
- &mti->mti_off, th);
+ rc = dt_record_write(env, mdt->mdt_last_rcvd,
+ mdt_buf_const(env, &mti->mti_lsd,
+ sizeof(mti->mti_lsd)),
+ &mti->mti_off, th);
mdt_trans_stop(env, mdt, th);
mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
tmp = &mti->mti_lcd;
- rc = mdt_record_read(env, mdt->mdt_last_rcvd,
- mdt_buf(env, tmp, sizeof(*tmp)), off);
+ rc = dt_record_read(env, mdt->mdt_last_rcvd,
+ mdt_buf(env, tmp, sizeof(*tmp)), off);
if (rc == 0)
lcd_le_to_cpu(tmp, lcd);
lcd_cpu_to_le(lcd, tmp);
- rc = mdt_record_write(env, mdt->mdt_last_rcvd,
- mdt_buf_const(env, tmp, sizeof(*tmp)), off, th);
+ rc = dt_record_write(env, mdt->mdt_last_rcvd,
+ mdt_buf_const(env, tmp, sizeof(*tmp)), off, th);
CDEBUG(D_INFO, "write lcd @%d rc = %d:\n"
"uuid = %s\n"
rc = mdt_client_add(env, mdt, cl_idx);
/* can't fail existing */
LASSERTF(rc == 0, "rc = %d\n", rc);
+ /* VBR: set export last committed version */
+ exp->exp_last_committed = last_transno;
lcd = NULL;
spin_lock(&exp->exp_lock);
exp->exp_connecting = 0;
lsd->lsd_mount_count = mdt->mdt_mount_count;
/* save it, so mount count and last_transno is current */
- rc = mdt_server_data_update(env, mdt, (mti->mti_exp &&
+ rc = mdt_server_data_update(env, mdt, (mti->mti_exp &&
mti->mti_exp->exp_need_sync));
if (rc)
GOTO(err_client, rc);
RETURN(0);
err_client:
- target_recovery_fini(obd);
+ class_disconnect_exports(obd);
out:
return rc;
}
if (IS_ERR(th))
RETURN(PTR_ERR(th));
- /*
+ /*
* Until this operations will be committed the sync is needed
* for this export. This should be done _after_ starting the
* transaction so that many connecting clients will not bring
- * server down with lots of sync writes.
+ * server down with lots of sync writes.
*/
- mdt_trans_add_cb(th, mdt_client_cb, mti->mti_exp);
+ mdt_trans_add_cb(th, lut_cb_client, mti->mti_exp);
spin_lock(&mti->mti_exp->exp_lock);
mti->mti_exp->exp_need_sync = 1;
spin_unlock(&mti->mti_exp->exp_lock);
GOTO(free, rc = PTR_ERR(th));
if (need_sync) {
- /*
+ /*
* Until this operations will be committed the sync
- * is needed for this export.
+ * is needed for this export.
*/
- mdt_trans_add_cb(th, mdt_client_cb, exp);
+ mdt_trans_add_cb(th, lut_cb_client, exp);
}
mutex_down(&med->med_lcd_lock);
clear_bit(med->med_lr_idx, mdt->mdt_client_bitmap);
spin_unlock(&mdt->mdt_client_bitmap_lock);
- /*
+ /*
* Make sure the server's last_transno is up to date. Do this
* after the client is freed so we know all the client's
- * transactions have been committed.
+ * transactions have been committed.
*/
mdt_server_data_update(env, mdt, need_sync);
loff_t off;
int err;
__s32 rc = th->th_result;
- __u64 *transno_p;
ENTRY;
LASSERT(req);
}
off = med->med_lr_off;
+ LASSERT(ergo(mti->mti_transno == 0, rc != 0));
mutex_down(&med->med_lcd_lock);
if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
- transno_p = &lcd->lcd_last_close_transno;
+ if (mti->mti_transno != 0)
+ lcd->lcd_last_close_transno = mti->mti_transno;
lcd->lcd_last_close_xid = req->rq_xid;
lcd->lcd_last_close_result = rc;
} else {
- transno_p = &lcd->lcd_last_transno;
+ /* VBR: save versions in last_rcvd for reconstruct. */
+ __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+ if (pre_versions) {
+ lcd->lcd_pre_versions[0] = pre_versions[0];
+ lcd->lcd_pre_versions[1] = pre_versions[1];
+ lcd->lcd_pre_versions[2] = pre_versions[2];
+ lcd->lcd_pre_versions[3] = pre_versions[3];
+ }
+ if (mti->mti_transno != 0)
+ lcd->lcd_last_transno = mti->mti_transno;
lcd->lcd_last_xid = req->rq_xid;
lcd->lcd_last_result = rc;
/*XXX: save intent_disposition in mdt_thread_info?
lcd->lcd_last_data = mti->mti_opdata;
}
- /*
- * When we store zero transno in lcd we can lost last transno value
- * because lcd contains 0, but lsd is not yet written
- * The server data should be updated also if the latest
- * transno is rewritten by zero. See the bug 11125 for details.
- */
- if (mti->mti_transno == 0 &&
- *transno_p == mdt->mdt_last_transno)
- mdt_server_data_update(mti->mti_env, mdt,
- (mti->mti_exp &&
- mti->mti_exp->exp_need_sync));
-
- *transno_p = mti->mti_transno;
-
if (off <= 0) {
CERROR("client idx %d has offset %lld\n", med->med_lr_idx, off);
err = -EINVAL;
return 0;
}
+/* Set new object versions */
+static void mdt_versions_set(struct mdt_thread_info *info)
+{
+ int i;
+ for (i = 0; i < PTLRPC_NUM_VERSIONS; i++)
+ if (info->mti_mos[i] != NULL)
+ mo_version_set(info->mti_env,
+ mdt_object_child(info->mti_mos[i]),
+ info->mti_transno);
+}
+
/* Update last_rcvd records with latests transaction data */
static int mdt_txn_stop_cb(const struct lu_env *env,
struct thandle *txn, void *cookie)
if (mti->mti_transno != 0) {
CERROR("Replay transno "LPU64" failed: rc %i\n",
mti->mti_transno, txn->th_result);
- mti->mti_transno = 0;
}
} else if (mti->mti_transno == 0) {
mti->mti_transno = ++ mdt->mdt_last_transno;
if (mti->mti_transno > mdt->mdt_last_transno)
mdt->mdt_last_transno = mti->mti_transno;
}
-
+ spin_unlock(&mdt->mdt_transno_lock);
/* sometimes the reply message has not been successfully packed */
LASSERT(req != NULL && req->rq_repmsg != NULL);
+ /** VBR: set new versions */
+ if (txn->th_result == 0)
+ mdt_versions_set(mti);
+
/* filling reply data */
CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
mti->mti_transno, req->rq_export->exp_obd->obd_last_committed);
lcd_last_xid(req->rq_export->exp_mdt_data.med_lcd));
/* save transno for the commit callback */
txi->txi_transno = mti->mti_transno;
- spin_unlock(&mdt->mdt_transno_lock);
+
+ /* add separate commit callback for transaction handling because we need
+ * export as parameter */
+ mdt_trans_add_cb(txn, lut_cb_last_committed, mti->mti_exp);
return mdt_last_rcvd_update(mti, txn);
}
struct thandle *txn, void *cookie)
{
struct mdt_device *mdt = cookie;
- struct obd_device *obd = mdt2obd_dev(mdt);
struct mdt_txn_info *txi;
int i;
txi = lu_context_key_get(&txn->th_ctx, &mdt_txn_key);
- /* copy of obd_transno_commit_cb() but with locking */
- spin_lock(&mdt->mdt_transno_lock);
- if (txi->txi_transno > obd->obd_last_committed) {
- obd->obd_last_committed = txi->txi_transno;
- spin_unlock(&mdt->mdt_transno_lock);
- ptlrpc_commit_replies(obd);
- } else
- spin_unlock(&mdt->mdt_transno_lock);
-
- if (txi->txi_transno)
- CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
- obd->obd_name, txi->txi_transno);
-
/* iterate through all additional callbacks */
for (i = 0; i < txi->txi_cb_count; i++) {
- txi->txi_cb[i].mdt_cb_func(mdt, txi->txi_transno,
- txi->txi_cb[i].mdt_cb_data, 0);
+ txi->txi_cb[i].lut_cb_func(&mdt->mdt_lut, txi->txi_transno,
+ txi->txi_cb[i].lut_cb_data, 0);
}
return 0;
}
mdt->mdt_txn_cb.dtc_txn_stop = mdt_txn_stop_cb;
mdt->mdt_txn_cb.dtc_txn_commit = mdt_txn_commit_cb;
mdt->mdt_txn_cb.dtc_cookie = mdt;
+ mdt->mdt_txn_cb.dtc_tag = LCT_MD_THREAD;
CFS_INIT_LIST_HEAD(&mdt->mdt_txn_cb.dtc_linkage);
dt_txn_callback_add(mdt->mdt_bottom, &mdt->mdt_txn_cb);
- o = dt_store_open(env, mdt->mdt_bottom, "", LAST_RCVD, &fid);
- if (!IS_ERR(o)) {
- mdt->mdt_last_rcvd = o;
- rc = mdt_server_data_init(env, mdt, lsi);
- if (rc)
- GOTO(put_last_rcvd, rc);
- } else {
- rc = PTR_ERR(o);
- CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
+ rc = mdt_server_data_init(env, mdt, lsi);
+ if (rc)
RETURN(rc);
- }
o = dt_store_open(env, mdt->mdt_bottom, "", CAPA_KEYS, &fid);
if (!IS_ERR(o)) {
} else {
rc = PTR_ERR(o);
CERROR("cannot open %s: rc = %d\n", CAPA_KEYS, rc);
- GOTO(put_last_rcvd, rc);
+ GOTO(disconnect_exports, rc);
}
RETURN(0);
put_ck_object:
lu_object_put(env, &o->do_lu);
mdt->mdt_ck_obj = NULL;
-put_last_rcvd:
- lu_object_put(env, &mdt->mdt_last_rcvd->do_lu);
- mdt->mdt_last_rcvd = NULL;
+disconnect_exports:
+ class_disconnect_exports(obd);
return rc;
}
/* Remove transaction callback */
dt_txn_callback_del(mdt->mdt_bottom, &mdt->mdt_txn_cb);
- if (mdt->mdt_last_rcvd)
- lu_object_put(env, &mdt->mdt_last_rcvd->do_lu);
- mdt->mdt_last_rcvd = NULL;
if (mdt->mdt_ck_obj)
lu_object_put(env, &mdt->mdt_ck_obj->do_lu);
mdt->mdt_ck_obj = NULL;
spin_unlock(&exp->exp_lock);
}
+/**
+ * VBR: restore versions
+ */
+void mdt_vbr_reconstruct(struct ptlrpc_request *req,
+ struct lsd_client_data *lcd)
+{
+ __u64 pre_versions[4] = {0};
+ pre_versions[0] = lcd->lcd_pre_versions[0];
+ pre_versions[1] = lcd->lcd_pre_versions[1];
+ pre_versions[2] = lcd->lcd_pre_versions[2];
+ pre_versions[3] = lcd->lcd_pre_versions[3];
+ lustre_msg_set_versions(req->rq_repmsg, pre_versions);
+}
+
void mdt_req_from_lcd(struct ptlrpc_request *req,
struct lsd_client_data *lcd)
{
lustre_msg_get_opc(req->rq_repmsg) == MDS_DONE_WRITING) {
req->rq_transno = lcd->lcd_last_close_transno;
req->rq_status = lcd->lcd_last_close_result;
- lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
- lustre_msg_set_status(req->rq_repmsg, req->rq_status);
} else {
req->rq_transno = lcd->lcd_last_transno;
req->rq_status = lcd->lcd_last_result;
- lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
- lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+ mdt_vbr_reconstruct(req, lcd);
}
+ if (req->rq_status != 0)
+ req->rq_transno = 0;
+ lustre_msg_set_transno(req->rq_repmsg, req->rq_transno);
+ lustre_msg_set_status(req->rq_repmsg, req->rq_status);
+ DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"/status %d",
+ req->rq_transno, req->rq_status);
+
mdt_steal_ack_locks(req);
}
RETURN(rc);
}
+int mdt_version_get_check(struct mdt_thread_info *info, int index)
+{
+ /** version recovery */
+ struct md_object *mo;
+ struct ptlrpc_request *req = mdt_info_req(info);
+ __u64 curr_version, *pre_versions;
+ ENTRY;
+
+ if (!exp_connect_vbr(req->rq_export))
+ RETURN(0);
+
+ LASSERT(info->mti_mos[index]);
+ LASSERT(mdt_object_exists(info->mti_mos[index]));
+ mo = mdt_object_child(info->mti_mos[index]);
+
+ curr_version = mo_version_get(info->mti_env, mo);
+ CDEBUG(D_INODE, "Version is "LPX64"\n", curr_version);
+ /** VBR: version is checked always because costs nothing */
+ if (lustre_msg_get_transno(req->rq_reqmsg) != 0) {
+ pre_versions = lustre_msg_get_versions(req->rq_reqmsg);
+ LASSERT(index < PTLRPC_NUM_VERSIONS);
+ /** Sanity check for malformed buffers */
+ if (pre_versions == NULL) {
+ CERROR("No versions in request buffer\n");
+ spin_lock(&req->rq_export->exp_lock);
+ req->rq_export->exp_vbr_failed = 1;
+ spin_unlock(&req->rq_export->exp_lock);
+ RETURN(-EOVERFLOW);
+ } else if (pre_versions[index] != curr_version) {
+ CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
+ pre_versions[index], curr_version);
+ spin_lock(&req->rq_export->exp_lock);
+ req->rq_export->exp_vbr_failed = 1;
+ spin_unlock(&req->rq_export->exp_lock);
+ RETURN(-EOVERFLOW);
+ }
+ }
+ /** save pre-versions in reply */
+ LASSERT(req->rq_repmsg != NULL);
+ pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+ if (pre_versions)
+ pre_versions[index] = curr_version;
+ RETURN(0);
+}
+
static int mdt_md_create(struct mdt_thread_info *info)
{
struct mdt_device *mdt = info->mti_mdt;
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_CREATE_WRITE);
+ info->mti_mos[0] = parent;
+ info->mti_mos[1] = child;
+ rc = mdt_version_get_check(info, 0);
+ if (rc)
+ GOTO(out_put_child, rc);
+
/* Let lower layer know current lock mode. */
info->mti_spec.sp_cr_mode =
mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
mdt_pack_attr2body(info, repbody, &ma->ma_attr,
mdt_object_fid(child));
}
+out_put_child:
mdt_object_put(info->mti_env, child);
} else
rc = PTR_ERR(child);
struct md_attr *ma = &info->mti_attr;
struct mdt_lock_handle *lh;
int som_update = 0;
+ int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID);
int rc;
ENTRY;
if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
ma->ma_attr_flags |= MDS_VTX_BYPASS;
+ /* VBR: update version if attr changed are important for recovery */
+ if (do_vbr) {
+ info->mti_mos[0] = mo;
+ rc = mdt_version_get_check(info, 0);
+ if (rc)
+ GOTO(out_unlock, rc);
+ }
+
/* all attrs are packed into mti_attr in unpack_setattr */
rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
if (rc != 0)
if (IS_ERR(mo))
GOTO(out, rc = PTR_ERR(mo));
+ /* start a log jounal handle if needed */
if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
if ((ma->ma_attr.la_valid & LA_SIZE) ||
(rr->rr_flags & MRF_SETATTR_LOCKED)) {
GOTO(out, rc);
}
+ info->mti_mos[0] = mp;
+ rc = mdt_version_get_check(info, 0);
+ if (rc)
+ GOTO(out_unlock_parent, rc);
+
mdt_reint_init_ma(info, ma);
if (!ma->ma_lmm || !ma->ma_cookie)
GOTO(out_unlock_parent, rc = -EINVAL);
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_UNLINK_WRITE);
+ info->mti_mos[1] = mc;
+ rc = mdt_version_get_check(info, 1);
+ if (rc)
+ GOTO(out_unlock_child, rc);
+
/*
* Now we can only make sure we need MA_INODE, in mdd layer, will check
* whether need MA_LOV and MA_COOKIE.
mdt_handle_last_unlink(info, mc, ma);
EXIT;
+out_unlock_child:
mdt_object_unlock_put(info, mc, child_lh, rc);
out_unlock_parent:
mdt_object_unlock_put(info, mp, parent_lh, rc);
if (IS_ERR(mp))
RETURN(PTR_ERR(mp));
+ info->mti_mos[0] = mp;
+ rc = mdt_version_get_check(info, 0);
+ if (rc)
+ GOTO(out_unlock_parent, rc);
+
/* step 2: find & lock the source */
lhs = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_reg_init(lhs, LCK_EX);
mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
OBD_FAIL_MDS_REINT_LINK_WRITE);
+ info->mti_mos[1] = ms;
+ rc = mdt_version_get_check(info, 1);
+ if (rc)
+ GOTO(out_unlock_child, rc);
+
lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
rc = mdo_link(info->mti_env, mdt_object_child(mp),
mdt_object_child(ms), lname, ma);
EXIT;
+out_unlock_child:
mdt_object_unlock_put(info, ms, lhs, rc);
out_unlock_parent:
mdt_object_unlock_put(info, mp, lhp, rc);
if (IS_ERR(msrcdir))
GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
+ info->mti_mos[0] = msrcdir;
+ rc = mdt_version_get_check(info, 0);
+ if (rc)
+ GOTO(out_unlock_source, rc);
+
/* step 2: find & lock the target dir. */
lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
MDS_INODELOCK_UPDATE,
MDT_LOCAL_LOCK);
- if (rc != 0)
+ if (rc != 0) {
+ mdt_object_put(info->mti_env, mtgtdir);
+ GOTO(out_unlock_source, rc);
+ }
+
+ info->mti_mos[1] = mtgtdir;
+ rc = mdt_version_get_check(info, 1);
+ if (rc)
GOTO(out_unlock_target, rc);
}
}
mdt_object_put(info->mti_env, mold);
GOTO(out_unlock_target, rc);
}
+
+ info->mti_mos[2] = mold;
+ rc = mdt_version_get_check(info, 2);
+ if (rc)
+ GOTO(out_unlock_old, rc);
+
mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
/* step 4: find & lock the new object. */
mdt_object_put(info->mti_env, mnew);
GOTO(out_unlock_old, rc);
}
+
+ info->mti_mos[3] = mnew;
+ rc = mdt_version_get_check(info, 3);
+ if (rc)
+ GOTO(out_unlock_new, rc);
+
mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
} else if (rc != -EREMOTE && rc != -ENOENT)
GOTO(out_unlock_old, rc);
static const char user_string[] = "user.";
int size, rc;
ENTRY;
-
+
if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK))
RETURN(-ENOMEM);
if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR) &&
!strncmp(xattr_name, user_string, sizeof(user_string) - 1))
RETURN(-EOPNOTSUPP);
-
+
size = mo_xattr_get(info->mti_env,
mdt_object_child(info->mti_object),
&LU_BUF_NULL, xattr_name);
if (IS_ERR(obj))
GOTO(out, rc = PTR_ERR(obj));
+ info->mti_mos[0] = obj;
+ rc = mdt_version_get_check(info, 0);
+ if (rc)
+ GOTO(out_unlock, rc);
+
if (unlikely(!(valid & OBD_MD_FLCTIME))) {
CWARN("client miss to set OBD_MD_FLCTIME when "
"setxattr: [object "DFID"] [valid %llu]\n",
result = 0;
list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
- if (cb->dtc_txn_start == NULL)
+ if (cb->dtc_txn_start == NULL ||
+ !(cb->dtc_tag & env->le_ctx.lc_tags))
continue;
result = cb->dtc_txn_start(env, param, cb->dtc_cookie);
if (result < 0)
result = 0;
list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
- if (cb->dtc_txn_stop == NULL)
+ if (cb->dtc_txn_stop == NULL ||
+ !(cb->dtc_tag & env->le_ctx.lc_tags))
continue;
result = cb->dtc_txn_stop(env, txn, cb->dtc_cookie);
if (result < 0)
result = 0;
list_for_each_entry(cb, &dev->dd_txn_callbacks, dtc_linkage) {
- if (cb->dtc_txn_commit == NULL)
+ if (cb->dtc_txn_commit == NULL ||
+ !(cb->dtc_tag & env->le_ctx.lc_tags))
continue;
result = cb->dtc_txn_commit(env, txn, cb->dtc_cookie);
if (result < 0)
lu_context_key_degister(&dt_key);
}
+int dt_record_read(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, loff_t *pos)
+{
+ int rc;
+
+ LASSERTF(dt != NULL, "dt is NULL when we want to read record\n");
+
+ rc = dt->do_body_ops->dbo_read(env, dt, buf, pos, BYPASS_CAPA);
+
+ if (rc == buf->lb_len)
+ rc = 0;
+ else if (rc >= 0)
+ rc = -EFAULT;
+ return rc;
+}
+EXPORT_SYMBOL(dt_record_read);
+
+int dt_record_write(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, loff_t *pos, struct thandle *th)
+{
+ int rc;
+
+ LASSERTF(dt != NULL, "dt is NULL when we want to write record\n");
+ LASSERT(th != NULL);
+ rc = dt->do_body_ops->dbo_write(env, dt, buf, pos, th, BYPASS_CAPA, 1);
+ if (rc == buf->lb_len)
+ rc = 0;
+ else if (rc >= 0)
+ rc = -EFAULT;
+ return rc;
+}
+EXPORT_SYMBOL(dt_record_write);
+
const struct dt_index_features dt_directory_features;
EXPORT_SYMBOL(dt_directory_features);
ptlrpc_put_connection_superhack(exp->exp_connection);
LASSERT(list_empty(&exp->exp_outstanding_replies));
+ LASSERT(list_empty(&exp->exp_uncommitted_replies));
LASSERT(list_empty(&exp->exp_req_replay_queue));
LASSERT(list_empty(&exp->exp_queued_rpc));
obd_destroy_export(exp);
atomic_set(&export->exp_rpc_count, 0);
export->exp_obd = obd;
CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
+ spin_lock_init(&export->exp_uncommitted_replies_lock);
+ CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
exp->exp_obd->obd_num_exports--;
spin_unlock(&exp->exp_obd->obd_dev_lock);
+ /* Keep these counter valid always */
+ spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
+ if (exp->exp_delayed)
+ exp->exp_obd->obd_delayed_clients--;
+ else if (exp->exp_in_recovery)
+ exp->exp_obd->obd_recoverable_clients--;
+ else if (exp->exp_obd->obd_recovering)
+ exp->exp_obd->obd_max_recoverable_clients--;
+ spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
class_export_put(exp);
}
EXPORT_SYMBOL(class_unlink_export);
ENTRY;
/* Move all of the exports from obd_exports to a work list, en masse. */
+ CFS_INIT_LIST_HEAD(&work_list);
spin_lock(&obd->obd_dev_lock);
- list_add(&work_list, &obd->obd_exports);
- list_del_init(&obd->obd_exports);
+ list_splice_init(&obd->obd_exports, &work_list);
+ list_splice_init(&obd->obd_delayed_exports, &work_list);
spin_unlock(&obd->obd_dev_lock);
if (!list_empty(&work_list)) {
if (test_export(exp))
continue;
- list_del(&exp->exp_obd_chain);
- list_add(&exp->exp_obd_chain, &work_list);
+ list_move(&exp->exp_obd_chain, &work_list);
/* don't count self-export as client */
if (obd_uuid_equals(&exp->exp_client_uuid,
&exp->exp_obd->obd_uuid))
}
olg->olg_ctxts[ctxt->loc_idx] = NULL;
spin_unlock(&olg->olg_lock);
-
+
if (ctxt->loc_lcm)
lcm_put(ctxt->loc_lcm);
-
+
obd = ctxt->loc_obd;
spin_lock(&obd->obd_dev_lock);
spin_unlock(&obd->obd_dev_lock); /* sync with llog ctxt user thread */
LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
"obd %p obd_magic %08X != %08X\n",
obd, obd->obd_magic, OBD_DEVICE_MAGIC);
- LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0, "%p obd_name %s != %s\n",
- obd, obd->obd_name, name);
+ LASSERTF(strncmp(obd->obd_name, name, strlen(name)) == 0,
+ "%p obd_name %s != %s\n", obd, obd->obd_name, name);
rwlock_init(&obd->obd_pool_lock);
obd->obd_pool_limit = 0;
obd->obd_pool_slv = 0;
CFS_INIT_LIST_HEAD(&obd->obd_exports);
+ CFS_INIT_LIST_HEAD(&obd->obd_delayed_exports);
CFS_INIT_LIST_HEAD(&obd->obd_exports_timed);
CFS_INIT_LIST_HEAD(&obd->obd_nid_stats);
spin_lock_init(&obd->obd_nid_lock);
llog_group_init(&obd->obd_olg, FILTER_GROUP_LLOG);
- spin_lock_init(&obd->obd_uncommitted_replies_lock);
- CFS_INIT_LIST_HEAD(&obd->obd_uncommitted_replies);
-
len = strlen(uuid);
if (len >= sizeof(obd->obd_uuid)) {
CERROR("uuid must be < %d bytes long\n",
/* Leave this on forever */
obd->obd_stopping = 1;
spin_unlock(&obd->obd_dev_lock);
-
+
if (lcfg->lcfg_bufcount >= 2 && LUSTRE_CFG_BUFLEN(lcfg, 1) > 0) {
for (flag = lustre_cfg_string(lcfg, 1); *flag != 0; flag++)
switch (*flag) {
ldlm_timeout = max(lcfg->lcfg_num, 1U);
if (ldlm_timeout >= obd_timeout)
ldlm_timeout = max(obd_timeout / 3, 1U);
-
+
GOTO(out, err = 0);
}
case LCFG_SET_UPCALL: {
static void filter_commit_cb(struct obd_device *obd, __u64 transno,
void *cb_data, int error)
{
- obd_transno_commit_cb(obd, transno, error);
+ struct obd_export *exp = cb_data;
+ obd_transno_commit_cb(obd, transno, exp, error);
+}
+
+int filter_version_get_check(struct obd_export *exp,
+ struct obd_trans_info *oti, struct inode *inode)
+{
+ __u64 curr_version;
+
+ if (inode == NULL || oti == NULL)
+ RETURN(0);
+
+ curr_version = fsfilt_get_version(exp->exp_obd, inode);
+ if ((__s64)curr_version == -EOPNOTSUPP)
+ RETURN(0);
+ /* VBR: version is checked always because costs nothing */
+ if (oti->oti_pre_version != 0 &&
+ oti->oti_pre_version != curr_version) {
+ CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
+ oti->oti_pre_version, curr_version);
+ spin_lock(&exp->exp_lock);
+ exp->exp_vbr_failed = 1;
+ spin_unlock(&exp->exp_lock);
+ RETURN (-EOVERFLOW);
+ }
+ oti->oti_pre_version = curr_version;
+ RETURN(0);
}
/* Assumes caller has already pushed us into the kernel context. */
-int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
- int rc, int force_sync)
+int filter_finish_transno(struct obd_export *exp, struct inode *inode,
+ struct obd_trans_info *oti, int rc, int force_sync)
{
struct filter_obd *filter = &exp->exp_obd->u.filter;
struct filter_export_data *fed = &exp->exp_filter_data;
RETURN(rc);
/* we don't allocate new transnos for replayed requests */
+ spin_lock(&filter->fo_translock);
if (oti->oti_transno == 0) {
- spin_lock(&filter->fo_translock);
last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1;
filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd);
- spin_unlock(&filter->fo_translock);
- oti->oti_transno = last_rcvd;
} else {
- spin_lock(&filter->fo_translock);
last_rcvd = oti->oti_transno;
if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno))
filter->fo_fsd->lsd_last_transno =
cpu_to_le64(last_rcvd);
+ }
+ oti->oti_transno = last_rcvd;
+ if (last_rcvd <= le64_to_cpu(lcd->lcd_last_transno)) {
spin_unlock(&filter->fo_translock);
+ LBUG();
}
lcd->lcd_last_transno = cpu_to_le64(last_rcvd);
+ lcd->lcd_pre_versions[0] = cpu_to_le64(oti->oti_pre_version);
+ lcd->lcd_last_xid = cpu_to_le64(oti->oti_xid);
+ spin_unlock(&filter->fo_translock);
- /* could get xid from oti, if it's ever needed */
- lcd->lcd_last_xid = 0;
+ if (inode)
+ fsfilt_set_version(exp->exp_obd, inode, last_rcvd);
off = fed->fed_lr_off;
if (off <= 0) {
last_rcvd,
oti->oti_handle,
filter_commit_cb,
- NULL);
+ exp);
err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp,
lcd, sizeof(*lcd), &off,
force_sync | exp->exp_need_sync);
if (force_sync)
- filter_commit_cb(exp->exp_obd, last_rcvd, NULL, err);
+ filter_commit_cb(exp->exp_obd, last_rcvd, exp, err);
}
if (err) {
log_pri = D_ERROR;
rc = PTR_ERR(handle);
CERROR("unable to start transaction: rc %d\n", rc);
} else {
+ fed->fed_lcd->lcd_last_epoch =
+ filter->fo_fsd->lsd_start_epoch;
+ exp->exp_last_request_time = cfs_time_current_sec();
rc = fsfilt_add_journal_cb(obd, 0, handle,
target_client_add_cb, exp);
if (rc == 0) {
static int filter_free_server_data(struct filter_obd *filter)
{
- OBD_FREE(filter->fo_fsd, sizeof(*filter->fo_fsd));
+ OBD_FREE_PTR(filter->fo_fsd);
filter->fo_fsd = NULL;
OBD_FREE(filter->fo_last_rcvd_slots, LR_MAX_CLIENTS / 8);
filter->fo_last_rcvd_slots = NULL;
CDEBUG(D_INODE, "server last_mount: "LPU64"\n",
le64_to_cpu(fsd->lsd_mount_count));
- fsd->lsd_compat14 = fsd->lsd_last_transno;
rc = fsfilt_write_record(obd, filp, fsd, sizeof(*fsd), &off, force_sync);
if (rc)
CERROR("error writing lr_server_data: rc = %d\n", rc);
struct inode *inode = filp->f_dentry->d_inode;
unsigned long last_rcvd_size = i_size_read(inode);
__u64 mount_count;
+ __u32 start_epoch;
int cl_idx;
loff_t off = 0;
int rc;
GOTO(err_fsd, rc = -EINVAL);
}
- CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n",
+ start_epoch = le32_to_cpu(fsd->lsd_start_epoch);
+
+ CDEBUG(D_INODE, "%s: server start_epoch : %#x\n",
+ obd->obd_name, start_epoch);
+ CDEBUG(D_INODE, "%s: server last_transno : "LPX64"\n",
obd->obd_name, le64_to_cpu(fsd->lsd_last_transno));
CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
obd->obd_name, mount_count + 1);
/* can't fail for existing client */
LASSERTF(rc == 0, "rc = %d\n", rc);
- lcd = NULL;
+ /* VBR: set export last committed */
+ exp->exp_last_committed = last_rcvd;
spin_lock(&exp->exp_lock);
exp->exp_connecting = 0;
exp->exp_in_recovery = 0;
spin_unlock(&exp->exp_lock);
+ spin_lock_bh(&obd->obd_processing_task_lock);
obd->obd_max_recoverable_clients++;
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ lcd = NULL;
class_export_put(exp);
}
obd->obd_last_committed = le64_to_cpu(fsd->lsd_last_transno);
- target_recovery_init(obd, ost_handle);
+ target_recovery_init(&filter->fo_lut, ost_handle);
out:
filter->fo_mount_count = mount_count + 1;
GOTO(err_filp, rc = -EOPNOTSUPP);
}
+ /** lu_target has very limited use in filter now */
+ lut_init(NULL, &filter->fo_lut, obd, NULL);
+
rc = filter_init_server_data(obd, file);
if (rc) {
CERROR("cannot read %s: rc = %d\n", LAST_RCVD, rc);
spin_lock_init(&filter->fo_llog_list_lock);
filter->fo_fl_oss_capa = 1;
+
CFS_INIT_LIST_HEAD(&filter->fo_capa_keys);
filter->fo_capa_hash = init_capa_hash();
if (filter->fo_capa_hash == NULL)
- GOTO(err_ops, rc = -ENOMEM);
+ GOTO(err_post, rc = -ENOMEM);
sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
obd->obd_namespace = ldlm_namespace_new(obd, ns_name, LDLM_NAMESPACE_SERVER,
case OBD_CLEANUP_EARLY:
break;
case OBD_CLEANUP_EXPORTS:
+ /* Stop recovery before namespace cleanup. */
+ target_stop_recovery_thread(obd);
target_cleanup_recovery(obd);
rc = filter_llog_preclean(obd);
break;
lprocfs_obd_cleanup(obd);
lquota_cleanup(filter_quota_interface_ref, obd);
- /* Stop recovery before namespace cleanup. */
- target_stop_recovery_thread(obd);
- target_cleanup_recovery(obd);
-
ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
obd->obd_namespace = NULL;
static int filter_ping(struct obd_export *exp)
{
filter_fmd_expire(exp);
-
return 0;
}
old_size = i_size_read(inode);
}
+ /* VBR: version recovery check */
+ rc = filter_version_get_check(exp, oti, inode);
+ if (rc)
+ GOTO(out_unlock, rc);
+
/* If the inode still has SUID+SGID bits set (see filter_precreate())
* then we will accept the UID+GID sent by the client during write for
* initializing the ownership of this inode. We only allow this to
* sure we have one left for the last_rcvd update. */
err = fsfilt_extend(exp->exp_obd, inode, 1, handle);
- rc = filter_finish_transno(exp, oti, rc, sync);
+ rc = filter_finish_transno(exp, inode, oti, rc, sync);
if (sync) {
filter_cancel_cookies_cb(exp->exp_obd, 0, fcc, rc);
fcc = NULL;
* (see BUG 4180) -bzzz
*/
LOCK_INODE_MUTEX(dchild->d_inode);
+
+ /* VBR: version recovery check */
+ rc = filter_version_get_check(exp, oti, dchild->d_inode);
+ if (rc)
+ GOTO(cleanup, rc);
+
handle = fsfilt_start_log(obd, dchild->d_inode, FSFILT_OP_SETATTR,
NULL, 1);
if (IS_ERR(handle)) {
* on commit. then we call callback directly to free
* the fcc.
*/
- rc = filter_finish_transno(exp, oti, rc, sync);
+ rc = filter_finish_transno(exp, NULL, oti, rc, sync);
if (sync) {
filter_cancel_cookies_cb(obd, 0, fcc, rc);
fcc = NULL;
#ifdef __KERNEL__
# include <linux/spinlock.h>
#endif
-#include <lustre_disk.h>
#include <lustre_handles.h>
#include <lustre_debug.h>
#include <obd.h>
extern int *obdfilter_created_scratchpad;
extern void target_recovery_fini(struct obd_device *obd);
-extern void target_recovery_init(struct obd_device *obd,
+extern void target_recovery_init(struct lu_target *lut,
svc_handler_t handler);
/* filter.c */
const char *what, int quiet);
#define filter_oa2dentry(obd, oa) __filter_oa2dentry(obd, oa, __FUNCTION__, 0)
-int filter_finish_transno(struct obd_export *, struct obd_trans_info *, int rc,
- int force_sync);
+int filter_finish_transno(struct obd_export *, struct inode *,
+ struct obd_trans_info *, int rc, int force_sync);
__u64 filter_next_id(struct filter_obd *, struct obdo *);
__u64 filter_last_id(struct filter_obd *, obd_gr group);
int filter_update_fidea(struct obd_export *exp, struct inode *inode,
UNLOCK_INODE_MUTEX(inode);
- rc2 = filter_finish_transno(exp, oti, 0, 0);
+ rc2 = filter_finish_transno(exp, inode, oti, 0, 0);
if (rc2 != 0) {
CERROR("can't close transaction: %d\n", rc2);
if (rc == 0)
RETURN(rc);
}
+/*
+ * Get the 64-bit version for an inode.
+ */
+static dt_obj_version_t osd_object_version_get(const struct lu_env *env,
+ struct dt_object *dt)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ CDEBUG(D_INFO, "Get version "LPX64" for inode %lu\n",
+ LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+ return LDISKFS_I(inode)->i_fs_version;
+}
+
+/*
+ * Set the 64-bit version and return the old version.
+ */
+static void osd_object_version_set(const struct lu_env *env, struct dt_object *dt,
+ dt_obj_version_t new_version)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ CDEBUG(D_INFO, "Set version "LPX64" (old "LPX64") for inode %lu\n",
+ new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
+ LDISKFS_I(inode)->i_fs_version = new_version;
+ /** Version is set after all inode operations are finished,
+ * so we should mark it dirty here */
+ inode->i_sb->s_op->dirty_inode(inode);
+}
+
static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
void **data)
{
.do_xattr_list = osd_xattr_list,
.do_capa_get = osd_capa_get,
.do_object_sync = osd_object_sync,
+ .do_version_get = osd_object_version_get,
+ .do_version_set = osd_object_version_set,
.do_data_get = osd_data_get,
};
.do_xattr_list = osd_xattr_list,
.do_capa_get = osd_capa_get,
.do_object_sync = osd_object_sync,
+ .do_version_get = osd_object_version_get,
+ .do_version_set = osd_object_version_set,
.do_data_get = osd_data_get,
};
ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
ptlrpc_objs += pers.o lproc_ptlrpc.o wiretest.o layout.o
ptlrpc_objs += sec.o sec_bulk.o sec_gc.o sec_config.o sec_lproc.o
-ptlrpc_objs += sec_null.o sec_plain.o
+ptlrpc_objs += sec_null.o sec_plain.o target.o
ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs)
}
/**
+ * save pre-versions for replay
+ */
+static void ptlrpc_save_versions(struct ptlrpc_request *req)
+{
+ struct lustre_msg *repmsg = req->rq_repmsg;
+ struct lustre_msg *reqmsg = req->rq_reqmsg;
+ __u64 *versions = lustre_msg_get_versions(repmsg);
+ ENTRY;
+
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+ return;
+
+ LASSERT(versions);
+ lustre_msg_set_versions(reqmsg, versions);
+ CDEBUG(D_INFO, "Client save versions ["LPX64"/"LPX64"]\n",
+ versions[0], versions[1]);
+
+ EXIT;
+}
+
+/**
* Callback function called when client receives RPC reply for \a req.
*/
static int after_reply(struct ptlrpc_request *req)
lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
}
- if (req->rq_import->imp_replayable) {
+ if (imp->imp_replayable) {
spin_lock(&imp->imp_lock);
/*
* No point in adding already-committed requests to the replay
if (req->rq_transno != 0 &&
(req->rq_transno >
lustre_msg_get_last_committed(req->rq_repmsg) ||
- req->rq_replay))
+ req->rq_replay)) {
+ /** version recovery */
+ ptlrpc_save_versions(req);
ptlrpc_retain_replayable_request(req, imp);
- else if (req->rq_commit_cb != NULL) {
+ } else if (req->rq_commit_cb != NULL) {
spin_unlock(&imp->imp_lock);
req->rq_commit_cb(req);
spin_lock(&imp->imp_lock);
lustre_msg_get_status(req->rq_repmsg) == -ENODEV))
GOTO(out, rc = lustre_msg_get_status(req->rq_repmsg));
- /* The transno had better not change over replay. */
- LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
- lustre_msg_get_transno(req->rq_repmsg) ||
- lustre_msg_get_transno(req->rq_repmsg) == 0,
- LPX64"/"LPX64"\n",
- lustre_msg_get_transno(req->rq_reqmsg),
- lustre_msg_get_transno(req->rq_repmsg));
+ /** VBR: check version failure */
+ if (lustre_msg_get_status(req->rq_repmsg) == -EOVERFLOW) {
+ /** replay was failed due to version mismatch */
+ DEBUG_REQ(D_WARNING, req, "Version mismatch during replay\n");
+ spin_lock(&imp->imp_lock);
+ imp->imp_vbr_failed = 1;
+ imp->imp_no_lock_replay = 1;
+ spin_unlock(&imp->imp_lock);
+ } else {
+ /** The transno had better not change over replay. */
+ LASSERTF(lustre_msg_get_transno(req->rq_reqmsg) ==
+ lustre_msg_get_transno(req->rq_repmsg) ||
+ lustre_msg_get_transno(req->rq_repmsg) == 0,
+ LPX64"/"LPX64"\n",
+ lustre_msg_get_transno(req->rq_reqmsg),
+ lustre_msg_get_transno(req->rq_repmsg));
+ }
+
+ spin_lock(&imp->imp_lock);
+ /** if replays by version then gap was occur on server, no trust to locks */
+ if (lustre_msg_get_flags(req->rq_repmsg) & MSG_VERSION_REPLAY)
+ imp->imp_no_lock_replay = 1;
+ imp->imp_last_replay_transno = lustre_msg_get_transno(req->rq_reqmsg);
+ spin_unlock(&imp->imp_lock);
+ LASSERT(imp->imp_last_replay_transno);
DEBUG_REQ(D_HA, req, "got rep");
{
ENTRY;
atomic_dec(&req->rq_import->imp_replay_inflight);
- if (req->rq_status == 0) {
+ if (req->rq_status == 0 &&
+ !req->rq_import->imp_vbr_failed) {
ptlrpc_import_recovery_state_machine(req->rq_import);
} else {
- CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
- "reconnecting\n",
- req->rq_import->imp_obd->obd_name, req->rq_status);
+ if (req->rq_import->imp_vbr_failed) {
+ CDEBUG(D_WARNING,
+ "%s: version recovery fails, reconnecting\n",
+ req->rq_import->imp_obd->obd_name);
+ spin_lock(&req->rq_import->imp_lock);
+ req->rq_import->imp_vbr_failed = 0;
+ spin_unlock(&req->rq_import->imp_lock);
+ } else {
+ CDEBUG(D_HA, "%s: LAST_REPLAY message error: %d, "
+ "reconnecting\n",
+ req->rq_import->imp_obd->obd_name,
+ req->rq_status);
+ }
ptlrpc_connect_import(req->rq_import, NULL);
}
return lustre_swab_buf(req->rq_repmsg, index, min_size, swabber);
}
+static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg)
+{
+ return lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
+ sizeof(struct ptlrpc_body));
+}
+
__u32 lustre_msghdr_get_flags(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_flags |= flags;
return;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_flags = flags;
return;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_flags &= ~(MSG_GEN_FLAG_MASK & flags);
return;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_op_flags |= flags;
return;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_op_flags |= flags;
return;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return NULL;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return PTL_RPC_MSG_ERR;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_version |= version;
return;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
}
}
+__u64 *lustre_msg_get_versions(struct lustre_msg *msg)
+{
+ switch (msg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V1:
+ return NULL;
+ case LUSTRE_MSG_MAGIC_V2: {
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+ if (!pb) {
+ CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+ return NULL;
+ }
+ return pb->pb_pre_versions;
+ }
+ default:
+ CERROR("incorrect message magic: %08x\n", msg->lm_magic);
+ return NULL;
+ }
+}
+
__u64 lustre_msg_get_transno(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return -EINVAL;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return -EINVAL;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return -EINVAL;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return;
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
return 0;
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
return 0;
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
return 0;
case LUSTRE_MSG_MAGIC_V2:
case LUSTRE_MSG_MAGIC_V2_SWABBED: {
- struct ptlrpc_body *pb;
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
return crc32_le(~(__u32)0, (unsigned char *)pb, sizeof(*pb));
}
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_handle = *handle;
return;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_type = type;
return;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_opc = opc;
return;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_last_xid = last_xid;
return;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_last_committed = last_committed;
return;
}
}
-void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno)
+void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions)
{
switch (msg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V1:
+ return;
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+ LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
+ pb->pb_pre_versions[0] = versions[0];
+ pb->pb_pre_versions[1] = versions[1];
+ pb->pb_pre_versions[2] = versions[2];
+ pb->pb_pre_versions[3] = versions[3];
+ return;
+ }
+ default:
+ LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+ }
+}
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno)
+{
+ switch (msg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V2: {
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_transno = transno;
return;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_status = status;
return;
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_conn_cnt = conn_cnt;
return;
case LUSTRE_MSG_MAGIC_V1:
return;
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_timeout = timeout;
return;
case LUSTRE_MSG_MAGIC_V1:
return;
case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb;
-
- pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_service_time = service_time;
return;
imp->imp_next_ping = cfs_time_current();
}
+static inline int imp_is_deactive(struct obd_import *imp)
+{
+ return (imp->imp_deactive ||
+ OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_IMP_DEACTIVE));
+}
+
static inline int ptlrpc_next_reconnect(struct obd_import *imp)
{
if (imp->imp_server_timeout)
this_ping) && force == 0)
return;
- if (level == LUSTRE_IMP_DISCON && !imp->imp_deactive) {
+ if (level == LUSTRE_IMP_DISCON && !imp_is_deactive(imp)) {
/* wait at least a timeout before trying recovery again */
imp->imp_next_ping = ptlrpc_next_reconnect(imp);
ptlrpc_initiate_recovery(imp);
} else if (level != LUSTRE_IMP_FULL ||
imp->imp_obd->obd_no_recov ||
- imp->imp_deactive) {
+ imp_is_deactive(imp)) {
CDEBUG(D_HA, "not pinging %s (in recovery "
" or recovery disabled: %s)\n",
obd2cli_tgt(imp->imp_obd),
CDEBUG(D_RPCTRACE, "checking import %s->%s\n",
imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
#ifdef ENABLE_LIBLUSTRE_RECOVERY
- if (imp->imp_state == LUSTRE_IMP_DISCON && !imp->imp_deactive)
+ if (imp->imp_state == LUSTRE_IMP_DISCON &&
+ !imp_is_deactive(imp))
#else
/*XXX only recover for the initial connection */
if (!lustre_handle_is_used(&imp->imp_remote_handle) &&
- imp->imp_state == LUSTRE_IMP_DISCON && !imp->imp_deactive)
+ imp->imp_state == LUSTRE_IMP_DISCON &&
+ !imp_is_deactive(imp))
#endif
ptlrpc_initiate_recovery(imp);
else if (imp->imp_state != LUSTRE_IMP_FULL)
"state %d, deactive %d\n",
imp->imp_obd->obd_uuid.uuid,
obd2cli_tgt(imp->imp_obd), imp->imp_state,
- imp->imp_deactive);
+ imp_is_deactive(imp));
}
EXIT;
#endif
EXPORT_SYMBOL(lustre_msg_get_opc);
EXPORT_SYMBOL(lustre_msg_get_last_xid);
EXPORT_SYMBOL(lustre_msg_get_last_committed);
+EXPORT_SYMBOL(lustre_msg_get_versions);
EXPORT_SYMBOL(lustre_msg_get_transno);
EXPORT_SYMBOL(lustre_msg_get_status);
EXPORT_SYMBOL(lustre_msg_get_slv);
EXPORT_SYMBOL(lustre_msg_set_opc);
EXPORT_SYMBOL(lustre_msg_set_last_xid);
EXPORT_SYMBOL(lustre_msg_set_last_committed);
+EXPORT_SYMBOL(lustre_msg_set_versions);
EXPORT_SYMBOL(lustre_msg_set_transno);
EXPORT_SYMBOL(lustre_msg_set_status);
EXPORT_SYMBOL(lustre_msg_set_conn_cnt);
static struct llog_canceld_ctxt *llcd_get(struct llog_ctxt *ctxt)
{
struct llog_canceld_ctxt *llcd;
-
+ LASSERT(ctxt);
llcd = llcd_alloc(ctxt->loc_lcm);
if (!llcd) {
CERROR("Can't alloc an llcd for ctxt %p\n", ctxt);
}
lcm = ctxt->loc_lcm;
CDEBUG(D_INFO, "cancel on lsm %p\n", lcm);
-
+
/*
* Let's check if we have all structures alive. We also check for
* possible shutdown. Do nothing if we're stopping.
req and send it again. If, however, the last sent
transno has been committed then we continue replay
from the next request. */
- if (imp->imp_resend_replay &&
- req->rq_transno == last_transno) {
- lustre_msg_add_flags(req->rq_reqmsg, MSG_RESENT);
- break;
- }
-
if (req->rq_transno > last_transno) {
- imp->imp_last_replay_transno = req->rq_transno;
+ if (imp->imp_resend_replay)
+ lustre_msg_add_flags(req->rq_reqmsg,
+ MSG_RESENT);
break;
}
-
req = NULL;
}
EXIT;
}
-void
-ptlrpc_commit_replies (struct obd_device *obd)
+void ptlrpc_commit_replies_alt(struct obd_export *exp)
{
- struct list_head *tmp;
- struct list_head *nxt;
+ struct ptlrpc_reply_state *rs, *nxt;
+ struct list_head committed_list;
+ DECLARE_RS_BATCH(batch);
+ ENTRY;
+
+ CFS_INIT_LIST_HEAD(&committed_list);
+ spin_lock(&exp->exp_uncommitted_replies_lock);
+ list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+ rs_obd_list) {
+ LASSERT (rs->rs_difficult);
+ LASSERT(rs->rs_export);
+ if (likely(rs->rs_transno <= exp->exp_last_committed))
+ list_move(&rs->rs_obd_list, &committed_list);
+ else
+ break;
+ }
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
+
+ /* XXX: do we need this in context of commit callback? maybe separate thread
+ * should work this out */
+ rs_batch_init(&batch);
+ /* get replies that have been committed and get their service
+ * to attend to complete them. */
+ list_for_each_entry_safe(rs, nxt, &committed_list, rs_obd_list) {
+ list_del_init(&rs->rs_obd_list);
+ rs_batch_add(&batch, rs);
+ }
+ rs_batch_fini(&batch);
+ EXIT;
+}
+void ptlrpc_commit_replies(struct obd_export *exp)
+{
+ struct ptlrpc_reply_state *rs, *nxt;
DECLARE_RS_BATCH(batch);
ENTRY;
* to attend to complete them. */
/* CAVEAT EMPTOR: spinlock ordering!!! */
- spin_lock(&obd->obd_uncommitted_replies_lock);
- list_for_each_safe (tmp, nxt, &obd->obd_uncommitted_replies) {
- struct ptlrpc_reply_state *rs =
- list_entry(tmp, struct ptlrpc_reply_state, rs_obd_list);
-
+ spin_lock(&exp->exp_uncommitted_replies_lock);
+ list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
+ rs_obd_list) {
LASSERT (rs->rs_difficult);
-
- if (rs->rs_transno <= obd->obd_last_committed) {
+ /* VBR: per-export last_committed */
+ LASSERT(rs->rs_export);
+ if (rs->rs_transno <= exp->exp_last_committed) {
list_del_init(&rs->rs_obd_list);
rs_batch_add(&batch, rs);
}
}
- spin_unlock(&obd->obd_uncommitted_replies_lock);
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
rs_batch_fini(&batch);
EXIT;
}
array->paa_count = 0;
array->paa_deadline = -1;
- /* allocate memory for srv_at_array (ptlrpc_at_array) */
+ /* allocate memory for srv_at_array (ptlrpc_at_array) */
OBD_ALLOC(array->paa_reqs_array, sizeof(struct list_head) * size);
if (array->paa_reqs_array == NULL)
GOTO(failed, NULL);
for (index = 0; index < size; index++)
CFS_INIT_LIST_HEAD(&array->paa_reqs_array[index]);
-
+
OBD_ALLOC(array->paa_reqs_count, sizeof(__u32) * size);
if (array->paa_reqs_count == NULL)
GOTO(failed, NULL);
if (req->rq_at_linked) {
struct ptlrpc_at_array *array = &svc->srv_at_array;
__u32 index = req->rq_at_index;
-
- req->rq_at_linked = 0;
+
+ req->rq_at_linked = 0;
array->paa_reqs_count[index]--;
array->paa_count--;
}
rq->rq_at_linked = 0;
continue;
}
-
+
/* update the earliest deadline */
if (deadline == -1 || rq->rq_deadline < deadline)
deadline = rq->rq_deadline;
list_del_init (&rs->rs_exp_list);
spin_unlock (&exp->exp_lock);
- /* Avoid obd_uncommitted_replies_lock contention if we 100% sure that
+ /* Avoid exp_uncommitted_replies_lock contention if we 100% sure that
* rs has been removed from the list already */
if (!list_empty_careful(&rs->rs_obd_list)) {
- spin_lock(&obd->obd_uncommitted_replies_lock);
+ spin_lock(&exp->exp_uncommitted_replies_lock);
list_del_init(&rs->rs_obd_list);
- spin_unlock(&obd->obd_uncommitted_replies_lock);
+ spin_unlock(&exp->exp_uncommitted_replies_lock);
}
spin_lock(&rs->rs_lock);
cfs_timer_disarm(&service->srv_at_timer);
if (array->paa_reqs_array != NULL) {
- OBD_FREE(array->paa_reqs_array,
+ OBD_FREE(array->paa_reqs_array,
sizeof(struct list_head) * array->paa_size);
array->paa_reqs_array = NULL;
}
-
+
if (array->paa_reqs_count != NULL) {
- OBD_FREE(array->paa_reqs_count,
+ OBD_FREE(array->paa_reqs_count,
sizeof(__u32) * array->paa_size);
array->paa_reqs_count= NULL;
}
-
+
OBD_FREE_PTR(service);
RETURN(0);
}
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
+ *
+ * Lustre Common Target
+ * These are common function for MDT and OST recovery-related functionality
+ *
+ * Author: Mikhail Pershin <tappro@sun.com>
+ */
+
+#include <obd.h>
+#include <lustre_fsfilt.h>
+/**
+ * Update client data in last_rcvd file. An obd API
+ */
+static int obt_client_data_update(struct obd_export *exp)
+{
+ struct lu_export_data *led = &exp->exp_target_data;
+ struct obd_device_target *obt = &exp->exp_obd->u.obt;
+ loff_t off = led->led_lr_off;
+ int rc = 0;
+
+ rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
+ led->led_lcd, sizeof(*led->led_lcd), &off, 0);
+
+ CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
+ led->led_lr_idx, le32_to_cpu(led->led_lcd->lcd_last_epoch),
+ le32_to_cpu(obt->obt_lsd->lsd_start_epoch));
+
+ return rc;
+}
+
+/**
+ * Update server data in last_rcvd file. An obd API
+ */
+int obt_server_data_update(struct obd_device *obd, int force_sync)
+{
+ struct obd_device_target *obt = &obd->u.obt;
+ loff_t off = 0;
+ int rc;
+ ENTRY;
+
+ CDEBUG(D_SUPER,
+ "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
+ obt->obt_lsd->lsd_uuid,
+ le64_to_cpu(obt->obt_lsd->lsd_mount_count),
+ le64_to_cpu(obt->obt_lsd->lsd_last_transno));
+
+ rc = fsfilt_write_record(obd, obt->obt_rcvd_filp, obt->obt_lsd,
+ sizeof(*obt->obt_lsd), &off, force_sync);
+ if (rc)
+ CERROR("error writing lr_server_data: rc = %d\n", rc);
+
+ RETURN(rc);
+}
+
+/**
+ * Update client epoch with server's one
+ */
+void obt_client_epoch_update(struct obd_export *exp)
+{
+ struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
+ struct obd_device_target *obt = &exp->exp_obd->u.obt;
+
+ /** VBR: set client last_epoch to current epoch */
+ if (le32_to_cpu(lcd->lcd_last_epoch) >=
+ le32_to_cpu(obt->obt_lsd->lsd_start_epoch))
+ return;
+ lcd->lcd_last_epoch = obt->obt_lsd->lsd_start_epoch;
+ obt_client_data_update(exp);
+}
+
+/**
+ * Increment server epoch. An obd API
+ */
+static void obt_boot_epoch_update(struct obd_device *obd)
+{
+ __u32 start_epoch;
+ struct obd_device_target *obt = &obd->u.obt;
+ struct ptlrpc_request *req;
+ struct list_head client_list;
+
+ spin_lock(&obt->obt_translock);
+ start_epoch = lr_epoch(le64_to_cpu(obt->obt_last_transno)) + 1;
+ obt->obt_last_transno = cpu_to_le64((__u64)start_epoch <<
+ LR_EPOCH_BITS);
+ obt->obt_lsd->lsd_start_epoch = cpu_to_le32(start_epoch);
+ spin_unlock(&obt->obt_translock);
+
+ CFS_INIT_LIST_HEAD(&client_list);
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ list_splice_init(&obd->obd_final_req_queue, &client_list);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+
+ /**
+ * go through list of exports participated in recovery and
+ * set new epoch for them
+ */
+ list_for_each_entry(req, &client_list, rq_list) {
+ LASSERT(!req->rq_export->exp_delayed);
+ obt_client_epoch_update(req->rq_export);
+ }
+ /** return list back at once */
+ spin_lock_bh(&obd->obd_processing_task_lock);
+ list_splice_init(&client_list, &obd->obd_final_req_queue);
+ spin_unlock_bh(&obd->obd_processing_task_lock);
+ obt_server_data_update(obd, 1);
+}
+
+/**
+ * write data in last_rcvd file.
+ */
+static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
+ const struct lu_buf *buf, loff_t *off, int sync)
+{
+ struct thandle *th;
+ struct txn_param p;
+ int rc, credits;
+ ENTRY;
+
+ credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
+ DTO_WRITE_BLOCK);
+ txn_param_init(&p, credits);
+
+ th = dt_trans_start(env, lut->lut_bottom, &p);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
+ dt_trans_stop(env, lut->lut_bottom, th);
+
+ CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
+ "uuid = %s\nlast_transno = "LPU64"\n",
+ rc, lut->lut_lsd.lsd_uuid, lut->lut_lsd.lsd_last_transno);
+
+ RETURN(rc);
+}
+
+/**
+ * Update client data in last_rcvd
+ */
+int lut_client_data_update(const struct lu_env *env, struct lu_target *lut,
+ struct obd_export *exp)
+{
+ struct lu_export_data *led = &exp->exp_target_data;
+ struct lsd_client_data tmp_lcd;
+ loff_t tmp_off = led->led_lr_off;
+ struct lu_buf tmp_buf = {
+ .lb_buf = &tmp_lcd,
+ .lb_len = sizeof(tmp_lcd)
+ };
+ int rc = 0;
+
+ lcd_cpu_to_le(led->led_lcd, &tmp_lcd);
+ LASSERT(lut->lut_last_rcvd);
+ rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, 0);
+
+ return rc;
+}
+
+/**
+ * Update server data in last_rcvd
+ */
+static int lut_server_data_update(const struct lu_env *env,
+ struct lu_target *lut, int sync)
+{
+ struct lr_server_data tmp_lsd;
+ loff_t tmp_off = 0;
+ struct lu_buf tmp_buf = {
+ .lb_buf = &tmp_lsd,
+ .lb_len = sizeof(tmp_lsd)
+ };
+ int rc = 0;
+ ENTRY;
+
+ CDEBUG(D_SUPER,
+ "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
+ lut->lut_lsd.lsd_uuid, lut->lut_mount_count,
+ lut->lut_last_transno);
+
+ spin_lock(&lut->lut_translock);
+ lut->lut_lsd.lsd_last_transno = lut->lut_last_transno;
+ spin_unlock(&lut->lut_translock);
+
+ lsd_cpu_to_le(&lut->lut_lsd, &tmp_lsd);
+ if (lut->lut_last_rcvd != NULL)
+ rc = lut_last_rcvd_write(env, lut, &tmp_buf, &tmp_off, sync);
+ RETURN(rc);
+}
+
+void lut_client_epoch_update(const struct lu_env *env, struct lu_target *lut,
+ struct obd_export *exp)
+{
+ struct lsd_client_data *lcd = exp->exp_target_data.led_lcd;
+
+ LASSERT(lut->lut_bottom);
+ /** VBR: set client last_epoch to current epoch */
+ if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
+ return;
+ lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
+ lut_client_data_update(env, lut, exp);
+}
+
+/**
+ * Update boot epoch when recovery ends
+ */
+void lut_boot_epoch_update(struct lu_target *lut)
+{
+ struct lu_env env;
+ struct ptlrpc_request *req;
+ __u32 start_epoch;
+ struct list_head client_list;
+ int rc;
+
+ if (lut->lut_obd->obd_stopping)
+ return;
+ /** Increase server epoch after recovery */
+ if (lut->lut_bottom == NULL)
+ return obt_boot_epoch_update(lut->lut_obd);
+
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (rc) {
+ CERROR("Can't initialize environment rc=%i\n", rc);
+ return;
+ }
+
+ spin_lock(&lut->lut_translock);
+ start_epoch = lr_epoch(lut->lut_last_transno) + 1;
+ lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
+ lut->lut_lsd.lsd_start_epoch = start_epoch;
+ spin_unlock(&lut->lut_translock);
+
+ CFS_INIT_LIST_HEAD(&client_list);
+ /**
+ * The recovery is not yet finished and final queue can still be updated
+ * with resend requests. Move final list to separate one for processing
+ */
+ spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
+ list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
+ spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
+
+ /**
+ * go through list of exports participated in recovery and
+ * set new epoch for them
+ */
+ list_for_each_entry(req, &client_list, rq_list) {
+ LASSERT(!req->rq_export->exp_delayed);
+ lut_client_epoch_update(&env, lut, req->rq_export);
+ }
+ /** return list back at once */
+ spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
+ list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
+ spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
+ /** update server epoch */
+ lut_server_data_update(&env, lut, 1);
+ lu_env_fini(&env);
+}
+EXPORT_SYMBOL(lut_boot_epoch_update);
+
+/**
+ * commit callback, need to update last_commited value
+ */
+void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
+ void *data, int err)
+{
+ struct obd_export *exp = data;
+
+ spin_lock(&lut->lut_translock);
+ if (transno > lut->lut_obd->obd_last_committed)
+ lut->lut_obd->obd_last_committed = transno;
+
+ LASSERT(exp);
+ if (!lut->lut_obd->obd_stopping &&
+ transno > exp->exp_last_committed) {
+ exp->exp_last_committed = transno;
+ spin_unlock(&lut->lut_translock);
+ ptlrpc_commit_replies(exp);
+ } else {
+ spin_unlock(&lut->lut_translock);
+ }
+ if (transno)
+ CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
+ lut->lut_obd->obd_name, transno);
+}
+EXPORT_SYMBOL(lut_cb_last_committed);
+
+void lut_cb_client(struct lu_target *lut, __u64 transno,
+ void *data, int err)
+{
+ LASSERT(lut->lut_obd);
+ target_client_add_cb(lut->lut_obd, transno, data, err);
+}
+EXPORT_SYMBOL(lut_cb_client);
+
+int lut_init(const struct lu_env *env, struct lu_target *lut,
+ struct obd_device *obd, struct dt_device *dt)
+{
+ struct lu_fid fid;
+ struct dt_object *o;
+ int rc = 0;
+ ENTRY;
+
+ lut->lut_obd = obd;
+ lut->lut_bottom = dt;
+ lut->lut_last_rcvd = NULL;
+
+ spin_lock_init(&lut->lut_translock);
+ spin_lock_init(&lut->lut_client_bitmap_lock);
+ spin_lock_init(&lut->lut_trans_table_lock);
+
+ /** obdfilter has no lu_device stack yet */
+ if (dt == NULL)
+ RETURN(rc);
+ o = dt_store_open(env, lut->lut_bottom, "", LAST_RCVD, &fid);
+ if (!IS_ERR(o)) {
+ lut->lut_last_rcvd = o;
+ } else {
+ rc = PTR_ERR(o);
+ CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(lut_init);
+
+void lut_fini(const struct lu_env *env, struct lu_target *lut)
+{
+ ENTRY;
+ if (lut->lut_last_rcvd)
+ lu_object_put(env, &lut->lut_last_rcvd->do_lu);
+ lut->lut_last_rcvd = NULL;
+ EXIT;
+}
+EXPORT_SYMBOL(lut_fini);
(long long)(int)offsetof(struct ptlrpc_body, pb_limit));
LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n",
(long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit));
+ CLASSERT(PTLRPC_NUM_VERSIONS == 4);
+ LASSERTF((int)offsetof(struct ptlrpc_body, pb_pre_versions[4]) == 120, " found %lld\n",
+ (long long)(int)offsetof(struct ptlrpc_body, pb_pre_versions[4]));
+ LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]) == 8, " found %lld\n",
+ (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]));
/* Checks for struct obd_connect_data */
LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n",
CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
+ CLASSERT(OBD_CONNECT_VBR == 0x80000000ULL);
CLASSERT(OBD_CONNECT_SKIP_ORPHAN == 0x400000000ULL);
/* Checks for struct obdo */
noinst_SCRIPTS += conf-sanity.sh insanity.sh lfscktest.sh oos.sh oos2.sh
noinst_SCRIPTS += llog-test.sh recovery-small.sh replay-dual.sh sanity-quota.sh
noinst_SCRIPTS += replay-ost-single.sh replay-single.sh run-llog.sh sanityN.sh
-noinst_SCRIPTS += runracer
+noinst_SCRIPTS += runracer replay-vbr.sh
noinst_SCRIPTS += performance-sanity.sh mdsrate-create-small.sh
noinst_SCRIPTS += mdsrate-create-large.sh mdsrate-lookup-1dir.sh
noinst_SCRIPTS += mdsrate-stat-small.sh mdsrate-stat-large.sh
#test race llog recovery thread vs llog cleanup
test_61a() { # was test_61
remote_ost_nodsh && skip "remote OST with nodsh" && return 0
-
+
mkdir -p $DIR/$tdir
createmany -o $DIR/$tdir/$tfile-%d 800
- replay_barrier ost1
-# OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
- unlinkmany $DIR/$tdir/$tfile-%d 800
+ replay_barrier ost1
+# OBD_FAIL_OST_LLOG_RECOVERY_TIMEOUT 0x221
+ unlinkmany $DIR/$tdir/$tfile-%d 800
set_nodes_failloc "$(osts_nodes)" 0x80000221
facet_failover ost1
- sleep 10
+ sleep 10
fail ost1
sleep 30
set_nodes_failloc "$(osts_nodes)" 0x0
-
+
$CHECKSTAT -t file $DIR/$tdir/$tfile-* && return 1
rmdir $DIR/$tdir
}
test_61b() {
# OBD_FAIL_MDS_LLOG_SYNC_TIMEOUT 0x13a
do_facet $SINGLEMDS "lctl set_param fail_loc=0x8000013a"
- facet_failover $SINGLEMDS
+ facet_failover $SINGLEMDS
sleep 10
fail $SINGLEMDS
do_facet client dd if=/dev/zero of=$DIR/$tfile bs=4k count=1 || return 1
test_61c() {
remote_ost_nodsh && skip "remote OST with nodsh" && return 0
-# OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
- touch $DIR/$tfile
+# OBD_FAIL_OST_CANCEL_COOKIE_TIMEOUT 0x222
+ touch $DIR/$tfile
set_nodes_failloc "$(osts_nodes)" 0x80000222
- rm $DIR/$tfile
+ rm $DIR/$tfile
sleep 10
fail ost1
set_nodes_failloc "$(osts_nodes)" 0x0
run_test 70b "mds recovery; $CLIENTCOUNT clients"
# end multi-client tests
+test_73a() {
+ multiop_bg_pause $DIR/$tfile O_tSc || return 3
+ pid=$!
+ rm -f $DIR/$tfile
+
+ replay_barrier $SINGLEMDS
+#define OBD_FAIL_LDLM_ENQUEUE 0x302
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000302"
+ fail $SINGLEMDS
+ kill -USR1 $pid
+ wait $pid || return 1
+ [ -e $DIR/$tfile ] && return 2
+ return 0
+}
+run_test 73a "open(O_CREAT), unlink, replay, reconnect before open replay , close"
+
+test_73b() {
+ multiop_bg_pause $DIR/$tfile O_tSc || return 3
+ pid=$!
+ rm -f $DIR/$tfile
+
+ replay_barrier $SINGLEMDS
+#define OBD_FAIL_LDLM_REPLY 0x30c
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0x8000030c"
+ fail $SINGLEMDS
+ kill -USR1 $pid
+ wait $pid || return 1
+ [ -e $DIR/$tfile ] && return 2
+ return 0
+}
+run_test 73b "open(O_CREAT), unlink, replay, reconnect at open_replay reply, close"
+
+test_73c() {
+ multiop_bg_pause $DIR/$tfile O_tSc || return 3
+ pid=$!
+ rm -f $DIR/$tfile
+
+ replay_barrier $SINGLEMDS
+#define OBD_FAIL_TGT_LAST_REPLAY 0x710
+ do_facet $SINGLEMDS "lctl set_param fail_loc=0x80000710"
+ fail $SINGLEMDS
+ kill -USR1 $pid
+ wait $pid || return 1
+ [ -e $DIR/$tfile ] && return 2
+ return 0
+}
+run_test 73c "open(O_CREAT), unlink, replay, reconnect at last_replay, close"
+
test_80a() {
[ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
--- /dev/null
+#!/bin/bash
+
+set -e
+
+# bug number: 16356
+ALWAYS_EXCEPT="2 $REPLAY_VBR_EXCEPT"
+
+SAVE_PWD=$PWD
+PTLDEBUG=${PTLDEBUG:--1}
+LUSTRE=${LUSTRE:-`dirname $0`/..}
+SETUP=${SETUP:-""}
+CLEANUP=${CLEANUP:-""}
+. $LUSTRE/tests/test-framework.sh
+
+init_test_env $@
+
+. ${CONFIG:=$LUSTRE/tests/cfg/$NAME.sh}
+
+[ -n "$CLIENTS" ] || { skip "Need two or more clients" && exit 0; }
+[ $CLIENTCOUNT -ge 2 ] || \
+ { skip "Need two or more clients, have $CLIENTCOUNT" && exit 0; }
+remote_mds_nodsh && skip "remote MDS with nodsh" && exit 0
+
+[ "$SLOW" = "no" ] && EXCEPT_SLOW=""
+
+
+[ ! "$NAME" = "ncli" ] && ALWAYS_EXCEPT="$ALWAYS_EXCEPT"
+[ "$NAME" = "ncli" ] && MOUNT_2=""
+MOUNT_2=""
+build_test_filter
+
+check_and_setup_lustre
+rm -rf $DIR/[df][0-9]*
+
+[ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
+
+[ "$CLIENTS" ] && zconf_umount_clients $CLIENTS $DIR
+
+test_1() {
+ echo "mount client $CLIENT1,$CLIENT2..."
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ do_node $CLIENT2 mkdir -p $DIR/$tdir
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+ do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 1
+ do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+ zconf_umount $CLIENT2 $DIR
+
+ facet_failover $SINGLEMDS
+ # recovery shouldn't fail due to missing client 2
+ do_node $CLIENT1 df $DIR || return 1
+
+ # All 50 files should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+ do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+
+ zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+ [ -e $DIR/$tdir/$tfile-2-0 ] && error "$tfile-2-0 exists"
+
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 1 "lost client doesn't affect another during replay"
+
+test_2() {
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ do_node $CLIENT2 mkdir -p $DIR/$tdir
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT2 mcreate $DIR/$tdir/$tfile
+ do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+ #client1 read data from client2 which will be lost
+ do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile
+ do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+ zconf_umount $CLIENT2 $DIR
+
+ facet_failover $SINGLEMDS
+ # recovery shouldn't fail due to missing client 2
+ do_node $CLIENT1 df $DIR || return 1
+
+ # All 50 files should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+ do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+ do_node $CLIENT1 $CHECKSTAT $DIR/$tdir/$tfile && return 4
+
+ zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 2 "lost data due to missed REMOTE client during replay"
+
+test_3a() {
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ #make sure the time will change
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.atime_diff=0" || return
+ do_node $CLIENT1 touch $DIR/$tfile
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
+ sleep 1
+ replay_barrier $SINGLEMDS
+ #change time
+ do_node $CLIENT2 touch $DIR/$tfile
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
+ #another change
+ do_node $CLIENT1 touch $DIR/$tfile
+ #remove file
+ do_node $CLIENT1 rm $DIR/$tfile
+ zconf_umount $CLIENT2 $DIR
+
+ facet_failover $SINGLEMDS
+ # recovery shouldn't fail due to missing client 2
+ do_node $CLIENT1 df $DIR || return 1
+ do_node $CLIENT1 $CHECKSTAT $DIR/$tfile && return 2
+
+ zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+
+ zconf_umount_clients $CLIENTS $DIR
+
+ return 0
+}
+run_test 3a "setattr of time/size doesn't change version"
+
+test_3b() {
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ #make sure the time will change
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL set_param mdd.${!var}.atime_diff=0" || return
+
+ do_node $CLIENT1 touch $DIR/$tfile
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
+ sleep 1
+ replay_barrier $SINGLEMDS
+ #change mode
+ do_node $CLIENT2 chmod +x $DIR/$tfile
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile
+ #abother chmod
+ do_node $CLIENT1 chmod -x $DIR/$tfile
+ zconf_umount $CLIENT2 $DIR
+
+ facet_failover $SINGLEMDS
+ # recovery should fail due to missing client 2
+ do_node $CLIENT1 df $DIR && return 1
+
+ do_node $CLIENT1 $CHECKSTAT -p 755 $DIR/$tfile && return 2
+ zconf_mount $CLIENT2 $DIR || error "mount $CLIENT2 $DIR fail"
+
+ zconf_umount_clients $CLIENTS $DIR
+
+ return 0
+}
+run_test 3b "setattr of permissions changes version"
+
+vbr_deactivate_client() {
+ local client=$1
+ echo "Deactivating client $client";
+ do_node $client "sysctl -w lustre.fail_loc=0x50d"
+}
+
+vbr_activate_client() {
+ local client=$1
+ echo "Activating client $client";
+ do_node $client "sysctl -w lustre.fail_loc=0x0"
+}
+
+remote_server ()
+{
+ local client=$1
+ [ -z "$(do_node $client lctl dl | grep mdt)" ] && \
+ [ -z "$(do_node $client lctl dl | grep ost)" ]
+}
+
+test_4a() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ do_node $CLIENT2 mkdir -p $DIR/$tdir
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+ do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+ do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+ vbr_deactivate_client $CLIENT2
+
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 1
+
+ # All 50 files should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+ do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 4
+ # All 25 files from client2 should have been replayed
+ do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 4a "fail MDS, delayed recovery"
+
+test_4b(){
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+ do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+ vbr_deactivate_client $CLIENT2
+
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 1
+
+ # create another set of files
+ do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 2
+
+ # All files from should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
+ do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 4
+ do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+ zconf_umount_clients $CLIENTS $DIR
+}
+run_test 4b "fail MDS, normal operation, delayed open recovery"
+
+test_4c() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 createmany -m $DIR/$tfile- 25
+ do_node $CLIENT2 createmany -m $DIR/$tdir/$tfile-2- 25
+ vbr_deactivate_client $CLIENT2
+
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 1
+
+ # create another set of files
+ do_node $CLIENT1 createmany -m $DIR/$tfile-3- 25
+
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 2
+
+ # All files from should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
+ do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 4
+ do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+ zconf_umount_clients $CLIENTS $DIR
+}
+run_test 4c "fail MDS, normal operation, delayed recovery"
+
+test_5a() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+ do_node $CLIENT2 createmany -o $DIR/$tfile-2- 1
+ do_node $CLIENT1 createmany -o $DIR/$tfile-3- 1
+ vbr_deactivate_client $CLIENT2
+
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR && return 1
+
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 2
+
+ # First 25 files should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 3
+ # Third file is failed due to missed client2
+ do_node $CLIENT1 $CHECKSTAT $DIR/$tfile-3-0 && error "$tfile-3-0 exists"
+ # file from client2 should exists
+ do_node $CLIENT2 unlinkmany $DIR/$tfile-2- 1 || return 4
+
+ zconf_umount_clients $CLIENTS $DIR
+}
+run_test 5a "fail MDS, delayed recovery should fail"
+
+test_5b() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+ do_node $CLIENT2 createmany -o $DIR/$tfile-2- 1
+ vbr_deactivate_client $CLIENT2
+
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 1
+ do_node $CLIENT1 $CHECKSTAT $DIR/$tfile-2-0 && error "$tfile-2-0 exists"
+
+ # create another set of files
+ do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR && return 4
+ # file from client2 should fail
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile-2-0 && error "$tfile-2-0 exists"
+
+ # All 50 files from client 1 should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+ do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+
+ zconf_umount_clients $CLIENTS $DIR
+}
+run_test 5b "fail MDS, normal operation, delayed recovery should fail"
+
+test_6a() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ do_node $CLIENT2 mkdir -p $DIR/$tdir
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+ do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+ do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+ vbr_deactivate_client $CLIENT2
+
+ facet_failover $SINGLEMDS
+ # replay only 5 requests
+ do_node $CLIENT2 "sysctl -w lustre.fail_val=5"
+#define OBD_FAIL_PTLRPC_REPLAY 0x50e
+ do_node $CLIENT2 "sysctl -w lustre.fail_loc=0x2000050e"
+ do_node $CLIENT2 df $DIR
+ # vbr_activate_client $CLIENT2
+ # need way to know that client stops replays
+ sleep 5
+
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 1
+
+ # All files should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+ do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+ do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 6a "fail MDS, delayed recovery, fail MDS"
+
+test_7a() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ do_node $CLIENT2 mkdir -p $DIR/$tdir
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 createmany -o $DIR/$tfile- 25
+ do_node $CLIENT2 createmany -o $DIR/$tdir/$tfile-2- 25
+ do_node $CLIENT1 createmany -o $DIR/$tfile-3- 25
+ vbr_deactivate_client $CLIENT2
+
+ facet_failover $SINGLEMDS
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 4
+
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 1
+
+ # All files should have been replayed
+ do_node $CLIENT1 unlinkmany $DIR/$tfile- 25 || return 2
+ do_node $CLIENT1 unlinkmany $DIR/$tfile-3- 25 || return 3
+ do_node $CLIENT2 unlinkmany $DIR/$tdir/$tfile-2- 25 || return 5
+
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 7a "fail MDS, delayed recovery, fail MDS"
+
+rmultiop_start() {
+ local client=$1
+ local file=$2
+
+ # We need to run do_node in bg, because pdsh does not exit
+ # if child process of run script exists.
+ # I.e. pdsh does not exit when runmultiop_bg_pause exited,
+ # because of multiop_bg_pause -> $MULTIOP_PROG &
+ # By the same reason we need sleep a bit after do_nodes starts
+ # to let runmultiop_bg_pause start muliop and
+ # update /tmp/multiop_bg.pid ;
+ # The rm /tmp/multiop_bg.pid guarantees here that
+ # we have the updated by runmultiop_bg_pause
+ # /tmp/multiop_bg.pid file
+
+ local pid_file=$TMP/multiop_bg.pid.$$
+ do_node $client "rm -f $pid_file && MULTIOP_PID_FILE=$pid_file LUSTRE= runmultiop_bg_pause $file O_tSc" &
+ local pid=$!
+ sleep 3
+ local multiop_pid
+ multiop_pid=$(do_node $client cat $pid_file)
+ [ -n "$multiop_pid" ] || error "$client : Can not get multiop_pid from $pid_file "
+ eval export ${client}_multiop_pid=$multiop_pid
+ eval export ${client}_do_node_pid=$pid
+ local var=${client}_multiop_pid
+ echo client $client multiop_bg started multiop_pid=${!var}
+ return $?
+}
+
+rmultiop_stop() {
+ local client=$1
+ local multiop_pid=${client}_multiop_pid
+ local do_node_pid=${client}_do_node_pid
+
+ echo "Stopping multiop_pid=${!multiop_pid} (kill ${!multiop_pid} on $client)"
+ do_node $client kill -USR1 ${!multiop_pid}
+
+ wait ${!do_node_pid} || true
+}
+
+test_8a() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ rmultiop_start $CLIENT2 $DIR/$tfile || return 1
+ do_node $CLIENT2 rm -f $DIR/$tfile
+ replay_barrier $SINGLEMDS
+ rmultiop_stop $CLIENT2 || return 2
+
+ vbr_deactivate_client $CLIENT2
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 3
+ #client1 is back and will try to open orphan
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 4
+
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 8a "orphans are kept until delayed recovery"
+
+test_8b() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ rmultiop_start $CLIENT2 $DIR/$tfile || return 1
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 rm -f $DIR/$tfile
+
+ vbr_deactivate_client $CLIENT2
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 2
+ #client1 is back and will try to open orphan
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 3
+
+ rmultiop_stop $CLIENT2 || return 1
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 8b "open1 | unlink2 X delayed_replay1, close1"
+
+test_8c() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ rmultiop_start $CLIENT2 $DIR/$tfile || return 1
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 rm -f $DIR/$tfile
+ rmultiop_stop $CLIENT2 || return 2
+
+ vbr_deactivate_client $CLIENT2
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 3
+ #client1 is back and will try to open orphan
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 4
+
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 8c "open1 | unlink2, close1 X delayed_replay1"
+
+test_8d() {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS "$LCTL get_param -n mdd.${!var}.stale_export_age" > /dev/null 2>&1
+ [ $? -ne 0 ] && { skip "No delayed recovery support" && return; }
+
+ remote_server $CLIENT2 || \
+ { skip "Client $CLIENT2 is on the server node" && return 0; }
+
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ rmultiop_start $CLIENT1 $DIR/$tfile || return 1
+ rmultiop_start $CLIENT2 $DIR/$tfile || return 2
+ replay_barrier $SINGLEMDS
+ do_node $CLIENT1 rm -f $DIR/$tfile
+ rmultiop_stop $CLIENT2 || return 3
+ rmultiop_stop $CLIENT1 || return 4
+
+ vbr_deactivate_client $CLIENT2
+ facet_failover $SINGLEMDS
+ do_node $CLIENT1 df $DIR || return 6
+
+ #client1 is back and will try to open orphan
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 8
+
+ do_node $CLIENT2 $CHECKSTAT $DIR/$tfile && error "$tfile exists"
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 8d "open1, open2 | unlink2, close1, close2 X delayed_replay1"
+
+test_8e() {
+ zconf_mount $CLIENT1 $DIR
+ zconf_mount $CLIENT2 $DIR
+
+ do_node $CLIENT1 mcreate $DIR/$tfile
+ do_node $CLIENT1 mkdir $DIR/$tfile-2
+ replay_barrier $SINGLEMDS
+ # missed replay from client1 will lead to recovery by versions
+ do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
+ do_node $CLIENT2 rm $DIR/$tfile || return 1
+ do_node $CLIENT2 touch $DIR/$tfile || return 2
+
+ zconf_umount $CLIENT1 $DIR
+ facet_failover $SINGLEMDS
+ do_node $CLIENT2 df $DIR || return 6
+
+ do_node $CLIENT2 rm $DIR/$tfile || error "$tfile doesn't exists"
+ zconf_umount_clients $CLIENTS $DIR
+ return 0
+}
+run_test 8e "create | unlink, create shouldn't fail"
+
+test_8f() {
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ do_node $CLIENT1 touch $DIR/$tfile
+ do_node $CLIENT1 mkdir $DIR/$tfile-2
+ replay_barrier $SINGLEMDS
+ # missed replay from client1 will lead to recovery by versions
+ do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
+ do_node $CLIENT2 rm -f $DIR/$tfile || return 1
+ do_node $CLIENT2 mcreate $DIR/$tfile || return 2
+
+ zconf_umount $CLIENT1 $DIR
+ facet_failover $SINGLEMDS
+ do_node $CLIENT2 df $DIR || return 6
+
+ do_node $CLIENT2 rm $DIR/$tfile || error "$tfile doesn't exists"
+ zconf_umount $CLIENT2 $DIR
+ return 0
+}
+run_test 8f "create | unlink, create shouldn't fail"
+
+test_8g() {
+ zconf_mount_clients $CLIENT1 $DIR
+ zconf_mount_clients $CLIENT2 $DIR
+
+ do_node $CLIENT1 touch $DIR/$tfile
+ do_node $CLIENT1 mkdir $DIR/$tfile-2
+ replay_barrier $SINGLEMDS
+ # missed replay from client1 will lead to recovery by versions
+ do_node $CLIENT1 touch $DIR/$tfile-2/$tfile
+ do_node $CLIENT2 rm -f $DIR/$tfile || return 1
+ do_node $CLIENT2 mkdir $DIR/$tfile || return 2
+
+ zconf_umount $CLIENT1 $DIR
+ facet_failover $SINGLEMDS
+ do_node $CLIENT2 df $DIR || return 6
+
+ do_node $CLIENT2 rmdir $DIR/$tfile || error "$tfile doesn't exists"
+ zconf_umount $CLIENT2 $DIR
+ return 0
+}
+run_test 8g "create | unlink, create shouldn't fail"
+
+test_10 () {
+ local var=${SINGLEMDS}_svc
+ do_facet $SINGLEMDS $LCTL get_param -n mdd.${!var}.stale_export_age && \
+ { skip "No delayed recovery support" && return; }
+ [ -z "$DBENCH_LIB" ] && skip "DBENCH_LIB is not set" && return 0
+
+ zconf_mount_clients $CLIENTS $DIR
+
+ local duration="-t 60"
+ local cmd="rundbench 1 $duration "
+ local PID=""
+ for CLIENT in ${CLIENTS//,/ }; do
+ $PDSH $CLIENT "set -x; PATH=:$PATH:$LUSTRE/utils:$LUSTRE/tests/:${DBENCH_LIB} DBENCH_LIB=${DBENCH_LIB} $cmd" &
+ PID=$!
+ echo $PID >pid.$CLIENT
+ echo "Started load PID=`cat pid.$CLIENT`"
+ done
+
+ replay_barrier $SINGLEMDS
+ sleep 3 # give clients a time to do operations
+
+ vbr_deactivate_client $CLIENT2
+
+ log "$TESTNAME fail $SINGLEMDS 1"
+ fail $SINGLEMDS
+
+# wait for client to reconnect to MDS
+ sleep $TIMEOUT
+
+ vbr_activate_client $CLIENT2
+ do_node $CLIENT2 df $DIR || return 4
+
+ for CLIENT in ${CLIENTS//,/ }; do
+ PID=`cat pid.$CLIENT`
+ wait $PID
+ rc=$?
+ echo "load on ${CLIENT} returned $rc"
+ done
+
+ zconf_umount_clients $CLIENTS $DIR
+}
+run_test 10 "mds version recovery; $CLIENTCOUNT clients"
+
+equals_msg `basename $0`: test complete, cleaning up
+#SLEEP=$((`date +%s` - $NOW))
+#[ $SLEEP -lt $TIMEOUT ] && sleep $SLEEP
+check_and_cleanup_lustre
+[ -f "$TESTSUITELOG" ] && cat $TESTSUITELOG && grep -q FAIL $TESTSUITELOG && exit 1 || true
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
lru_resize_enable mdc
lru_resize_enable osc
+# rm -rf $DIR/$tdir
}
run_test 120a "Early Lock Cancel: mkdir test"
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
lru_resize_enable mdc
lru_resize_enable osc
+# rm -rf $DIR/$tdir
}
run_test 120b "Early Lock Cancel: create test"
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
lru_resize_enable mdc
lru_resize_enable osc
+# rm -rf $DIR/$tdir
}
run_test 120c "Early Lock Cancel: link test"
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
lru_resize_enable mdc
lru_resize_enable osc
+# rm -rf $DIR/$tdir
}
run_test 120d "Early Lock Cancel: setattr test"
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
lru_resize_enable mdc
lru_resize_enable osc
+# rm -rf $DIR/$tdir
}
run_test 120e "Early Lock Cancel: unlink test"
[ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
lru_resize_enable mdc
lru_resize_enable osc
+# rm -rf $DIR/$tdir
}
run_test 120f "Early Lock Cancel: rename test"
# wait for commitment of removal
lru_resize_enable mdc
lru_resize_enable osc
+# rm -rf $DIR/$tdir
}
run_test 120g "Early Lock Cancel: performance test"
CHECK_MEMBER(ptlrpc_body, pb_service_time);
CHECK_MEMBER(ptlrpc_body, pb_slv);
CHECK_MEMBER(ptlrpc_body, pb_limit);
+ CHECK_CVALUE(PTLRPC_NUM_VERSIONS);
+ CHECK_MEMBER(ptlrpc_body, pb_pre_versions[PTLRPC_NUM_VERSIONS]);
}
static void check_obd_connect_data(void)
CHECK_CDEFINE(OBD_CONNECT_AT);
CHECK_CDEFINE(OBD_CONNECT_CANCELSET);
CHECK_CDEFINE(OBD_CONNECT_LRU_RESIZE);
+ CHECK_CDEFINE(OBD_CONNECT_VBR);
CHECK_CDEFINE(OBD_CONNECT_SKIP_ORPHAN);
}
(long long)(int)offsetof(struct ptlrpc_body, pb_limit));
LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n",
(long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit));
+ CLASSERT(PTLRPC_NUM_VERSIONS == 4);
+ LASSERTF((int)offsetof(struct ptlrpc_body, pb_pre_versions[4]) == 120, " found %lld\n",
+ (long long)(int)offsetof(struct ptlrpc_body, pb_pre_versions[4]));
+ LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]) == 8, " found %lld\n",
+ (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_pre_versions[4]));
/* Checks for struct obd_connect_data */
LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n",
CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
+ CLASSERT(OBD_CONNECT_VBR == 0x80000000ULL);
CLASSERT(OBD_CONNECT_SKIP_ORPHAN == 0x400000000ULL);
/* Checks for struct obdo */