From 4f53536d002c13886210b672b657795baa067144 Mon Sep 17 00:00:00 2001 From: Wang Di Date: Thu, 28 Aug 2014 02:15:32 -0700 Subject: [PATCH] LU-3540 lod: update recovery thread start recovery thread for each sub device in LOD, and these threads will retrieve the update log from remote MDT and attach them to recovery list. Add update records to the recovery list, which is attached to lu_target, and the recovery thread (target_recover_thread()) will choose update or request by transno during recovery. Add update_recovery.c to handle updates replay during recovery. Change-Id: I367e9ef88f010143df50f7d06bd4242db1105db7 Signed-off-by: Wang Di Reviewed-on: http://review.whamcloud.com/11737 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin --- lustre/fid/fid_request.c | 3 + lustre/include/dt_object.h | 6 + lustre/include/lu_object.h | 38 ++ lustre/include/lu_target.h | 57 ++ lustre/include/lustre/lustre_idl.h | 3 + lustre/include/lustre_fld.h | 1 + lustre/include/lustre_update.h | 73 +++ lustre/include/obd.h | 3 +- lustre/ldlm/ldlm_lib.c | 488 +++++++++++---- lustre/lod/lod_dev.c | 130 +++- lustre/lod/lod_internal.h | 13 +- lustre/lod/lod_lov.c | 7 + lustre/lod/lod_sub_object.c | 2 +- lustre/mdt/mdt_handler.c | 2 + lustre/mgs/mgs_internal.h | 5 - lustre/obdclass/llog_swab.c | 35 ++ lustre/ofd/ofd_obd.c | 1 + lustre/osp/osp_dev.c | 50 +- lustre/osp/osp_trans.c | 6 +- lustre/ptlrpc/Makefile.in | 1 + lustre/target/Makefile.am | 1 + lustre/target/out_handler.c | 790 +------------------------ lustre/target/out_lib.c | 798 +++++++++++++++++++++++++ lustre/target/tgt_internal.h | 191 +++--- lustre/target/update_records.c | 15 +- lustre/target/update_recovery.c | 1144 ++++++++++++++++++++++++++++++++++++ lustre/target/update_trans.c | 109 ++-- lustre/tests/conf-sanity.sh | 2 +- lustre/tests/sanity.sh | 9 +- 29 files changed, 2933 insertions(+), 1050 deletions(-) create mode 100644 lustre/target/update_recovery.c diff --git a/lustre/fid/fid_request.c b/lustre/fid/fid_request.c index d4b2555..528bed7 100644 --- a/lustre/fid/fid_request.c +++ b/lustre/fid/fid_request.c @@ -110,6 +110,9 @@ static int seq_client_rpc(struct lu_client_seq *seq, debug_mask = D_INFO; } + /* Allow seq client RPC during recovery time. */ + req->rq_allow_replay = 1; + ptlrpc_at_set_req_timeout(req); if (opc != SEQ_ALLOC_SUPER && seq->lcs_type == LUSTRE_SEQ_METADATA) diff --git a/lustre/include/dt_object.h b/lustre/include/dt_object.h index 4921873..4b98848 100644 --- a/lustre/include/dt_object.h +++ b/lustre/include/dt_object.h @@ -1779,6 +1779,12 @@ static inline struct dt_object *lu2dt_obj(struct lu_object *o) return container_of0(o, struct dt_object, do_lu); } +static inline struct dt_object *dt_object_child(struct dt_object *o) +{ + return container_of0(lu_object_next(&(o)->do_lu), + struct dt_object, do_lu); +} + /** * This is the general purpose transaction handle. * 1. Transaction Life Cycle diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index e40e826..d67d8fc 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -439,6 +439,44 @@ struct lu_attr { __u64 la_valid; }; +static inline void lu_attr_cpu_to_le(struct lu_attr *dst_attr, + struct lu_attr *src_attr) +{ + dst_attr->la_size = cpu_to_le64(src_attr->la_size); + dst_attr->la_mtime = cpu_to_le64(src_attr->la_mtime); + dst_attr->la_atime = cpu_to_le64(src_attr->la_atime); + dst_attr->la_ctime = cpu_to_le64(src_attr->la_ctime); + dst_attr->la_blocks = cpu_to_le64(src_attr->la_blocks); + dst_attr->la_mode = cpu_to_le32(src_attr->la_mode); + dst_attr->la_uid = cpu_to_le32(src_attr->la_uid); + dst_attr->la_gid = cpu_to_le32(src_attr->la_gid); + dst_attr->la_flags = cpu_to_le32(src_attr->la_flags); + dst_attr->la_nlink = cpu_to_le32(src_attr->la_nlink); + dst_attr->la_blkbits = cpu_to_le32(src_attr->la_blkbits); + dst_attr->la_blksize = cpu_to_le32(src_attr->la_blksize); + dst_attr->la_rdev = cpu_to_le32(src_attr->la_rdev); + dst_attr->la_valid = cpu_to_le64(src_attr->la_valid); +} + +static inline void lu_attr_le_to_cpu(struct lu_attr *dst_attr, + struct lu_attr *src_attr) +{ + dst_attr->la_size = le64_to_cpu(src_attr->la_size); + dst_attr->la_mtime = le64_to_cpu(src_attr->la_mtime); + dst_attr->la_atime = le64_to_cpu(src_attr->la_atime); + dst_attr->la_ctime = le64_to_cpu(src_attr->la_ctime); + dst_attr->la_blocks = le64_to_cpu(src_attr->la_blocks); + dst_attr->la_mode = le32_to_cpu(src_attr->la_mode); + dst_attr->la_uid = le32_to_cpu(src_attr->la_uid); + dst_attr->la_gid = le32_to_cpu(src_attr->la_gid); + dst_attr->la_flags = le32_to_cpu(src_attr->la_flags); + dst_attr->la_nlink = le32_to_cpu(src_attr->la_nlink); + dst_attr->la_blkbits = le32_to_cpu(src_attr->la_blkbits); + dst_attr->la_blksize = le32_to_cpu(src_attr->la_blksize); + dst_attr->la_rdev = le32_to_cpu(src_attr->la_rdev); + dst_attr->la_valid = le64_to_cpu(src_attr->la_valid); +} + /** Bit-mask of valid attributes */ enum la_valid { LA_ATIME = 1 << 0, diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 2e56849..6c4c384 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -43,6 +43,37 @@ #include #include +/* Each one represents a distribute transaction replay + * operation, and updates on each MDTs are linked to + * dtr_sub_list */ +struct distribute_txn_replay_req { + /* update record */ + struct llog_update_record *dtrq_lur; + int dtrq_lur_size; + + /* linked to the distribute transaction replay + * list (tdtd_replay_list) */ + struct list_head dtrq_list; + + /* all of sub updates are linked here */ + struct list_head dtrq_sub_list; + spinlock_t dtrq_sub_list_lock; +}; + +/* Each one represents a sub replay item under a distribute + * transaction. A distribute transaction will be operated in + * two or more MDTs, and updates on each MDT will be represented + * by this structure */ +struct distribute_txn_replay_req_sub { + __u32 dtrqs_mdt_index; + struct llog_cookie dtrqs_llog_cookie; + struct list_head dtrqs_list; +}; + +struct target_distribute_txn_data; +typedef int (*distribute_txn_replay_handler_t)(struct lu_env *env, + struct target_distribute_txn_data *tdtd, + struct distribute_txn_replay_req *dtrq); struct target_distribute_txn_data { /* Distribution ID is used to identify updates log on different * MDTs for one operation */ @@ -50,6 +81,7 @@ struct target_distribute_txn_data { __u64 tdtd_batchid; struct lu_target *tdtd_lut; struct dt_object *tdtd_batchid_obj; + struct dt_device *tdtd_dt; /* Committed batchid for distribute transaction */ __u64 tdtd_committed_batchid; @@ -60,6 +92,15 @@ struct target_distribute_txn_data { /* Threads to manage distribute transaction */ wait_queue_head_t tdtd_commit_thread_waitq; atomic_t tdtd_refcount; + + /* recovery update */ + distribute_txn_replay_handler_t tdtd_replay_handler; + struct list_head tdtd_replay_list; + spinlock_t tdtd_replay_list_lock; + /* last replay update transno */ + __u64 tdtd_last_update_transno; + __u32 tdtd_replay_ready:1; + }; struct lu_target { @@ -363,6 +404,22 @@ int distribute_txn_init(const struct lu_env *env, void distribute_txn_fini(const struct lu_env *env, struct target_distribute_txn_data *tdtd); +/* target/update_recovery.c */ +int insert_update_records_to_replay_list(struct target_distribute_txn_data *, + struct llog_update_record *, + struct llog_cookie *, __u32); +void dtrq_list_dump(struct target_distribute_txn_data *tdtd, + unsigned int mask); +void dtrq_list_destroy(struct target_distribute_txn_data *tdtd); +int distribute_txn_replay_handle(struct lu_env *env, + struct target_distribute_txn_data *tdtd, + struct distribute_txn_replay_req *dtrq); +__u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd); +struct distribute_txn_replay_req * +distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd); +void dtrq_destory(struct distribute_txn_replay_req *dtrq); +struct distribute_txn_replay_req_sub * +dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index); enum { ESERIOUS = 0x0001000 diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index c5c4f9e..0a129dc 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -983,6 +983,9 @@ struct lu_orphan_ent { }; void lustre_swab_orphan_ent(struct lu_orphan_ent *ent); +struct update_ops; +void lustre_swab_update_ops(struct update_ops *uops, unsigned int op_count); + /** @} lu_fid */ /** \defgroup lu_dir lu_dir diff --git a/lustre/include/lustre_fld.h b/lustre/include/lustre_fld.h index 807649d..f02639b 100644 --- a/lustre/include/lustre_fld.h +++ b/lustre/include/lustre_fld.h @@ -106,6 +106,7 @@ struct lu_server_fld { * if the MDT is upgraded from < 2.6 to 2.6, i.e. when the * local FLDB is being invited */ unsigned int lsf_new:1; + }; struct lu_client_fld { diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index 8a42fb8..1417804 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -102,6 +102,23 @@ update_params_get_param(const struct update_params *params, return param; } +static inline void* +update_params_get_param_buf(const struct update_params *params, __u16 index, + unsigned int param_count, __u16 *size) +{ + struct object_update_param *param; + + param = update_params_get_param(params, (unsigned int)index, + param_count); + if (param == NULL) + return NULL; + + if (size != NULL) + *size = param->oup_len; + + return ¶m->oup_buf[0]; +} + struct update_op { struct lu_fid uop_fid; __u16 uop_type; @@ -377,6 +394,59 @@ struct sub_thandle { bool st_committed:1; }; +struct tx_arg; +typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th, + struct tx_arg *ta); + +/* Structure for holding one update executation */ +struct tx_arg { + tx_exec_func_t exec_fn; + tx_exec_func_t undo_fn; + struct dt_object *object; + const char *file; + struct object_update_reply *reply; + int line; + int index; + union { + struct { + struct dt_insert_rec rec; + const struct dt_key *key; + } insert; + struct { + } ref; + struct { + struct lu_attr attr; + } attr_set; + struct { + struct lu_buf buf; + const char *name; + int flags; + __u32 csum; + } xattr_set; + struct { + struct lu_attr attr; + struct dt_allocation_hint hint; + struct dt_object_format dof; + struct lu_fid fid; + } create; + struct { + struct lu_buf buf; + loff_t pos; + } write; + struct { + struct ost_body *body; + } destroy; + } u; +}; + +/* Structure for holding all update executations of one transaction */ +struct thandle_exec_args { + struct thandle *ta_handle; + int ta_argno; /* used args */ + int ta_alloc_args; /* allocated args count */ + struct tx_arg **ta_args; +}; + /* target/out_lib.c */ int out_update_pack(const struct lu_env *env, struct object_update *update, size_t max_update_size, enum update_type op, @@ -466,6 +536,9 @@ static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt) struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt, struct dt_device *dt_dev); +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st); /* update_records.c */ int update_records_create_pack(const struct lu_env *env, diff --git a/lustre/include/obd.h b/lustre/include/obd.h index ee7873a..ce5457f 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -533,7 +533,8 @@ struct obd_device { * (for /proc/status only!!) */ obd_no_ir:1, /* no imperative recovery. */ obd_process_conf:1, /* device is processing mgs config */ - obd_uses_nid_stats:1; /* maintain per-client OBD stats */ + obd_uses_nid_stats:1, /* maintain per-client OBD stats */ + obd_force_abort_recovery:1; /* abort recovery forcely */ /* use separate field as it is set in interrupt to don't mess with * protection of other bits using _bh lock */ diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index cf48cf1..2c288c2 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -771,6 +771,8 @@ int target_handle_connect(struct ptlrpc_request *req) char *target_start; int target_len; bool mds_conn = false, lw_client = false; + bool mds_mds_conn = false; + bool new_mds_mds_conn = false; struct obd_connect_data *data, *tmpdata; int size, tmpsize; lnet_nid_t *client_nid = NULL; @@ -907,10 +909,15 @@ int target_handle_connect(struct ptlrpc_request *req) } } + /* Note: lw_client is needed in MDS-MDS failover during update log + * processing, so we needs to allow lw_client to be connected at + * anytime, instead of only the initial connection */ + lw_client = (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0; + if (lustre_msg_get_op_flags(req->rq_reqmsg) & MSG_CONNECT_INITIAL) { mds_conn = (data->ocd_connect_flags & OBD_CONNECT_MDS) != 0; - lw_client = (data->ocd_connect_flags & - OBD_CONNECT_LIGHTWEIGHT) != 0; + mds_mds_conn = (data->ocd_connect_flags & + OBD_CONNECT_MDS_MDS) != 0; /* OBD_CONNECT_MNE_SWAB is defined as OBD_CONNECT_MDS_MDS * for Imperative Recovery connection from MGC to MGS. @@ -1062,7 +1069,9 @@ no_export: if (export == NULL) { /* allow lightweight connections during recovery */ - if (target->obd_recovering && !lw_client) { + /* allow "new" MDT to be connected during recovery, since we + * need retrieve recovery update records from it */ + if (target->obd_recovering && !lw_client && !mds_mds_conn) { cfs_time_t t; int c; /* connected */ int i; /* in progress */ @@ -1095,6 +1104,9 @@ dont_check_exports: MSG_CONNECT_RECOVERING); if (rc == 0) conn.cookie = export->exp_handle.h_cookie; + + if (mds_mds_conn) + new_mds_mds_conn = true; } } else { rc = obd_reconnect(req->rq_svc_thread->t_env, @@ -1212,6 +1224,14 @@ dont_check_exports: atomic_inc(&target->obd_req_replay_clients); atomic_inc(&target->obd_lock_replay_clients); + /* Note: MDS-MDS connection is allowed to be connected during + * recovery, no matter if the exports needs to be recoveried. + * Because we need retrieve updates logs from all other MDTs. + * So if the MDS-MDS export is new, obd_max_recoverable_clients + * also needs to be increased to match other recovery checking + * condition. */ + if (new_mds_mds_conn) + target->obd_max_recoverable_clients++; if (atomic_inc_return(&target->obd_connected_clients) == target->obd_max_recoverable_clients) wake_up(&target->obd_next_transno_waitq); @@ -1392,8 +1412,9 @@ static void target_exp_dequeue_req_replay(struct ptlrpc_request *req) spin_unlock(&req->rq_export->exp_lock); } -static void target_finish_recovery(struct obd_device *obd) +static void target_finish_recovery(struct lu_target *lut) { + struct obd_device *obd = lut->lut_obd; ENTRY; /* Only log a recovery message when recovery has occurred. */ @@ -1426,6 +1447,10 @@ static void target_finish_recovery(struct obd_device *obd) } spin_unlock(&obd->obd_recovery_task_lock); + if (lut->lut_tdtd != NULL && + !list_empty(&lut->lut_tdtd->tdtd_replay_list)) + dtrq_list_dump(lut->lut_tdtd, D_ERROR); + obd->obd_recovery_end = cfs_time_current_sec(); /* When recovery finished, cleanup orphans on MDS and OST. */ @@ -1502,6 +1527,7 @@ void target_cleanup_recovery(struct obd_device *obd) return; } obd->obd_recovering = obd->obd_abort_recovery = 0; + obd->obd_force_abort_recovery = 0; spin_unlock(&obd->obd_dev_lock); spin_lock(&obd->obd_recovery_task_lock); @@ -1542,7 +1568,8 @@ static void target_start_recovery_timer(struct obd_device *obd) return; spin_lock(&obd->obd_dev_lock); - if (!obd->obd_recovering || obd->obd_abort_recovery) { + if (!obd->obd_recovering || obd->obd_abort_recovery || + obd->obd_force_abort_recovery) { spin_unlock(&obd->obd_dev_lock); return; } @@ -1583,7 +1610,8 @@ static void extend_recovery_timer(struct obd_device *obd, int drt, bool extend) int to; spin_lock(&obd->obd_dev_lock); - if (!obd->obd_recovering || obd->obd_abort_recovery) { + if (!obd->obd_recovering || obd->obd_abort_recovery || + obd->obd_force_abort_recovery) { spin_unlock(&obd->obd_dev_lock); return; } @@ -1685,23 +1713,14 @@ static inline int exp_finished(struct obd_export *exp) return (exp->exp_in_recovery && !exp->exp_lock_replay_needed); } -/** Checking routines for recovery */ -static int check_for_clients(struct obd_device *obd) -{ - unsigned int clnts = atomic_read(&obd->obd_connected_clients); - - if (obd->obd_abort_recovery || obd->obd_recovery_expired) - return 1; - LASSERT(clnts <= obd->obd_max_recoverable_clients); - return (clnts + obd->obd_stale_clients == - obd->obd_max_recoverable_clients); -} - -static int check_for_next_transno(struct obd_device *obd) +static int check_for_next_transno(struct lu_target *lut) { struct ptlrpc_request *req = NULL; + struct obd_device *obd = lut->lut_obd; int wake_up = 0, connected, completed, queue_len; - __u64 next_transno, req_transno; + __u64 req_transno = 0; + __u64 update_transno = 0; + __u64 next_transno = 0; ENTRY; spin_lock(&obd->obd_recovery_task_lock); @@ -1709,8 +1728,14 @@ static int check_for_next_transno(struct obd_device *obd) req = list_entry(obd->obd_req_replay_queue.next, struct ptlrpc_request, rq_list); req_transno = lustre_msg_get_transno(req->rq_reqmsg); - } else { - req_transno = 0; + } + + if (lut->lut_tdtd != NULL) { + struct target_distribute_txn_data *tdtd; + __u64 update_transno; + + tdtd = lut->lut_tdtd; + update_transno = distribute_txn_get_next_transno(lut->lut_tdtd); } connected = atomic_read(&obd->obd_connected_clients); @@ -1723,13 +1748,14 @@ static int check_for_next_transno(struct obd_device *obd) obd->obd_max_recoverable_clients, connected, completed, queue_len, req_transno, next_transno); - if (obd->obd_abort_recovery) { + if (obd->obd_abort_recovery || obd->obd_force_abort_recovery) { CDEBUG(D_HA, "waking for aborted recovery\n"); wake_up = 1; } else if (obd->obd_recovery_expired) { CDEBUG(D_HA, "waking for expired recovery\n"); wake_up = 1; - } else if (req_transno == next_transno) { + } else if (req_transno == next_transno || (update_transno != 0 && + update_transno <= next_transno)) { CDEBUG(D_HA, "waking for next ("LPD64")\n", next_transno); wake_up = 1; } else if (queue_len > 0 && @@ -1745,10 +1771,10 @@ static int check_for_next_transno(struct obd_device *obd) CDEBUG(d_lvl, "%s: waking for gap in transno, VBR is %s (skip: " LPD64", ql: %d, comp: %d, conn: %d, next: "LPD64 - ", last_committed: "LPD64")\n", + ", next_update "LPD64" last_committed: "LPD64")\n", obd->obd_name, obd->obd_version_recov ? "ON" : "OFF", next_transno, queue_len, completed, connected, - req_transno, obd->obd_last_committed); + req_transno, update_transno, obd->obd_last_committed); obd->obd_next_recovery_transno = req_transno; wake_up = 1; } else if (atomic_read(&obd->obd_req_replay_clients) == 0) { @@ -1764,8 +1790,9 @@ static int check_for_next_transno(struct obd_device *obd) return wake_up; } -static int check_for_next_lock(struct obd_device *obd) +static int check_for_next_lock(struct lu_target *lut) { + struct obd_device *obd = lut->lut_obd; int wake_up = 0; spin_lock(&obd->obd_recovery_task_lock); @@ -1775,7 +1802,7 @@ static int check_for_next_lock(struct obd_device *obd) } else if (atomic_read(&obd->obd_lock_replay_clients) == 0) { CDEBUG(D_HA, "waking for completed lock replay\n"); wake_up = 1; - } else if (obd->obd_abort_recovery) { + } else if (obd->obd_abort_recovery || obd->obd_force_abort_recovery) { CDEBUG(D_HA, "waking for aborted recovery\n"); wake_up = 1; } else if (obd->obd_recovery_expired) { @@ -1792,10 +1819,11 @@ static int check_for_next_lock(struct obd_device *obd) * check its status with help of check_routine * evict dead clients via health_check */ -static int target_recovery_overseer(struct obd_device *obd, - int (*check_routine)(struct obd_device *), +static int target_recovery_overseer(struct lu_target *lut, + int (*check_routine)(struct lu_target *), int (*health_check)(struct obd_export *)) { + struct obd_device *obd = lut->lut_obd; repeat: if ((obd->obd_recovery_start != 0) && (cfs_time_current_sec() >= (obd->obd_recovery_start + obd->obd_recovery_time_hard))) { @@ -1804,11 +1832,11 @@ repeat: } while (wait_event_timeout(obd->obd_next_transno_waitq, - check_routine(obd), + check_routine(lut), msecs_to_jiffies(60 * MSEC_PER_SEC)) == 0) /* wait indefinitely for event, but don't trigger watchdog */; - if (obd->obd_abort_recovery) { + if (obd->obd_abort_recovery || obd->obd_force_abort_recovery) { CWARN("recovery is aborted, evict exports in recovery\n"); /** evict exports which didn't finish recovery yet */ class_disconnect_stale_exports(obd, exp_finished); @@ -1835,50 +1863,13 @@ repeat: return 0; } -static struct ptlrpc_request *target_next_replay_req(struct obd_device *obd) -{ - struct ptlrpc_request *req = NULL; - ENTRY; - - CDEBUG(D_HA, "Waiting for transno "LPD64"\n", - obd->obd_next_recovery_transno); - - CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLAY_DELAY2, cfs_fail_val); - /** It is needed to extend recovery window above recovery_time_soft. - * Extending is possible only in the end of recovery window - * (see more details in handle_recovery_req). - */ - CFS_FAIL_TIMEOUT_MS(OBD_FAIL_TGT_REPLAY_DELAY, 300); - - if (target_recovery_overseer(obd, check_for_next_transno, - exp_req_replay_healthy)) { - abort_req_replay_queue(obd); - abort_lock_replay_queue(obd); - } - - spin_lock(&obd->obd_recovery_task_lock); - if (!list_empty(&obd->obd_req_replay_queue)) { - req = list_entry(obd->obd_req_replay_queue.next, - struct ptlrpc_request, rq_list); - list_del_init(&req->rq_list); - obd->obd_requests_queued_for_recovery--; - spin_unlock(&obd->obd_recovery_task_lock); - } else { - spin_unlock(&obd->obd_recovery_task_lock); - LASSERT(list_empty(&obd->obd_req_replay_queue)); - LASSERT(atomic_read(&obd->obd_req_replay_clients) == 0); - /** evict exports failed VBR */ - class_disconnect_stale_exports(obd, exp_vbr_healthy); - } - RETURN(req); -} - -static struct ptlrpc_request *target_next_replay_lock(struct obd_device *obd) +static struct ptlrpc_request *target_next_replay_lock(struct lu_target *lut) { + struct obd_device *obd = lut->lut_obd; struct ptlrpc_request *req = NULL; CDEBUG(D_HA, "Waiting for lock\n"); - if (target_recovery_overseer(obd, check_for_next_lock, + if (target_recovery_overseer(lut, check_for_next_lock, exp_lock_replay_healthy)) abort_lock_replay_queue(obd); @@ -1972,6 +1963,318 @@ static void handle_recovery_req(struct ptlrpc_thread *thread, EXIT; } +/** Checking routines for recovery */ +static int check_for_recovery_ready(struct lu_target *lut) +{ + struct obd_device *obd = lut->lut_obd; + unsigned int clnts = atomic_read(&obd->obd_connected_clients); + + CDEBUG(D_HA, "connected %d stale %d max_recoverable_clients %d" + " abort %d expired %d\n", clnts, obd->obd_stale_clients, + obd->obd_max_recoverable_clients, obd->obd_abort_recovery, + obd->obd_recovery_expired); + + if (obd->obd_force_abort_recovery) + return 1; + + if (!obd->obd_abort_recovery && !obd->obd_recovery_expired) { + LASSERT(clnts <= obd->obd_max_recoverable_clients); + if (clnts + obd->obd_stale_clients < + obd->obd_max_recoverable_clients) + return 0; + } + + if (lut->lut_tdtd != NULL) { + if (!lut->lut_tdtd->tdtd_replay_ready) { + /* Let's extend recovery timer, in case the recovery + * timer expired, and some clients got evicted */ + extend_recovery_timer(obd, obd->obd_recovery_timeout, + true); + return 0; + } else { + dtrq_list_dump(lut->lut_tdtd, D_HA); + } + } + + return 1; +} + +enum { + REQUEST_RECOVERY = 1, + UPDATE_RECOVERY = 2, +}; + +static __u64 get_next_replay_req_transno(struct obd_device *obd) +{ + __u64 transno = 0; + + if (!list_empty(&obd->obd_req_replay_queue)) { + struct ptlrpc_request *req; + + req = list_entry(obd->obd_req_replay_queue.next, + struct ptlrpc_request, rq_list); + transno = lustre_msg_get_transno(req->rq_reqmsg); + } + + return transno; +} +__u64 get_next_transno(struct lu_target *lut, int *type) +{ + struct obd_device *obd = lut->lut_obd; + struct target_distribute_txn_data *tdtd = lut->lut_tdtd; + __u64 transno = 0; + __u64 update_transno; + ENTRY; + + transno = get_next_replay_req_transno(obd); + if (type != NULL) + *type = REQUEST_RECOVERY; + + if (tdtd == NULL) + RETURN(transno); + + update_transno = distribute_txn_get_next_transno(tdtd); + if (transno == 0 || (transno >= update_transno && + update_transno != 0)) { + transno = update_transno; + if (type != NULL) + *type = UPDATE_RECOVERY; + } + + RETURN(transno); +} + +/** + * drop duplicate replay request + * + * Because the operation has been replayed by update recovery, the request + * with the same transno will be dropped and also notify the client to send + * next replay request. + * + * \param[in] env execution environment + * \param[in] obd failover obd device + * \param[in] req request to be dropped + */ +static void drop_duplicate_replay_req(struct lu_env *env, + struct obd_device *obd, + struct ptlrpc_request *req) +{ + DEBUG_REQ(D_HA, req, "remove t"LPD64" from %s because of duplicate" + " update records are found.\n", + lustre_msg_get_transno(req->rq_reqmsg), + libcfs_nid2str(req->rq_peer.nid)); + + /* Right now, only for MDS reint operation update replay and + * normal request replay can have the same transno */ + if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_REINT) { + req_capsule_set(&req->rq_pill, &RQF_MDS_REINT); + req->rq_status = req_capsule_server_pack(&req->rq_pill); + if (likely(req->rq_export)) + target_committed_to_req(req); + lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); + target_send_reply(req, req->rq_status, 0); + } else { + DEBUG_REQ(D_ERROR, req, "wrong opc" "from %s\n", + libcfs_nid2str(req->rq_peer.nid)); + } + target_exp_dequeue_req_replay(req); + target_request_copy_put(req); + obd->obd_replayed_requests++; +} + +/** + * Update last_rcvd of the update + * + * Because update recovery might update the last_rcvd by updates, i.e. + * it will not update the last_rcvd information in memory, so we need + * refresh these information in memory after update recovery. + * + * \param[in] obd obd_device under recoverying. + * \param[in] dtrq the update replay requests being replayed. + */ +static void target_update_lcd(struct lu_env *env, struct lu_target *lut, + struct distribute_txn_replay_req *dtrq) +{ + struct obd_device *obd = lut->lut_obd; + struct obd_export *export; + struct tg_export_data *ted; + struct distribute_txn_replay_req_sub *dtrqs; + struct seq_server_site *site; + struct update_records *ur; + const struct lu_fid *fid; + struct update_ops *ops; + struct update_params *params; + struct update_op *op; + __u32 mdt_index; + unsigned int i; + struct lsd_client_data *lcd = NULL; + + /* if Updates has been executed(committed) on the recovery target, + * i.e. the updates is not being executed on the target, so we do + * not need update it in memory */ + site = lu_site2seq(obd->obd_lu_dev->ld_site); + mdt_index = site->ss_node_id; + dtrqs = dtrq_sub_lookup(dtrq, mdt_index); + if (dtrqs != NULL) + return; + + if (dtrq->dtrq_lur == NULL) + return; + + /* Find the update last_rcvd record */ + fid = lu_object_fid(&lut->lut_last_rcvd->do_lu); + ur = &dtrq->dtrq_lur->lur_update_rec; + ops = &ur->ur_ops; + params = update_records_get_params(ur); + for (i = 0, op = &ops->uops_op[0]; i < ur->ur_update_count; + i++, op = update_op_next_op(op)) { + __u64 pos; + __u16 size; + void *buf; + + if (!lu_fid_eq(&op->uop_fid, fid)) + continue; + + if (op->uop_type != OUT_WRITE) + continue; + + buf = update_params_get_param_buf(params, op->uop_params_off[1], + ur->ur_param_count, NULL); + if (buf == NULL) + continue; + + pos = le64_to_cpu(*(__u64 *)buf); + if (pos == 0) + continue; + + buf = update_params_get_param_buf(params, op->uop_params_off[0], + ur->ur_param_count, &size); + if (buf == NULL) + continue; + + if (size != sizeof(*lcd)) + continue; + lcd = buf; + } + + if (lcd == NULL || lcd->lcd_uuid[0] == '\0') + return; + + /* locate the export then update the exp_target_data if needed */ + export = cfs_hash_lookup(obd->obd_uuid_hash, lcd->lcd_uuid); + if (export == NULL) + return; + + ted = &export->exp_target_data; + if (lcd->lcd_last_xid > ted->ted_lcd->lcd_last_xid) { + CDEBUG(D_HA, "%s update xid from "LPU64" to "LPU64"\n", + lut->lut_obd->obd_name, ted->ted_lcd->lcd_last_xid, + lcd->lcd_last_xid); + ted->ted_lcd->lcd_last_xid = lcd->lcd_last_xid; + ted->ted_lcd->lcd_last_result = lcd->lcd_last_result; + } + class_export_put(export); +} + +static void replay_request_or_update(struct lu_env *env, + struct lu_target *lut, + struct target_recovery_data *trd, + struct ptlrpc_thread *thread) +{ + struct obd_device *obd = lut->lut_obd; + struct ptlrpc_request *req = NULL; + int type; + __u64 transno; + ENTRY; + + CDEBUG(D_HA, "Waiting for transno "LPD64"\n", + obd->obd_next_recovery_transno); + + /* Replay all of request and update by transno */ + do { + struct target_distribute_txn_data *tdtd = lut->lut_tdtd; + + CFS_FAIL_TIMEOUT(OBD_FAIL_TGT_REPLAY_DELAY2, cfs_fail_val); + + /** It is needed to extend recovery window above + * recovery_time_soft. Extending is possible only in the + * end of recovery window (see more details in + * handle_recovery_req()). + */ + CFS_FAIL_TIMEOUT_MS(OBD_FAIL_TGT_REPLAY_DELAY, 300); + + if (target_recovery_overseer(lut, check_for_next_transno, + exp_req_replay_healthy)) { + abort_req_replay_queue(obd); + abort_lock_replay_queue(obd); + } + + spin_lock(&obd->obd_recovery_task_lock); + transno = get_next_transno(lut, &type); + if (type == REQUEST_RECOVERY && tdtd != NULL && + transno == tdtd->tdtd_last_update_transno) { + /* Drop replay request from client side, if the + * replay has been executed by update with the + * same transno */ + req = list_entry(obd->obd_req_replay_queue.next, + struct ptlrpc_request, rq_list); + list_del_init(&req->rq_list); + obd->obd_requests_queued_for_recovery--; + spin_unlock(&obd->obd_recovery_task_lock); + drop_duplicate_replay_req(env, obd, req); + } else if (type == REQUEST_RECOVERY && transno != 0) { + req = list_entry(obd->obd_req_replay_queue.next, + struct ptlrpc_request, rq_list); + list_del_init(&req->rq_list); + obd->obd_requests_queued_for_recovery--; + spin_unlock(&obd->obd_recovery_task_lock); + LASSERT(trd->trd_processing_task == current_pid()); + DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s", + lustre_msg_get_transno(req->rq_reqmsg), + libcfs_nid2str(req->rq_peer.nid)); + + handle_recovery_req(thread, req, + trd->trd_recovery_handler); + /** + * bz18031: increase next_recovery_transno before + * target_request_copy_put() will drop exp_rpc reference + */ + spin_lock(&obd->obd_recovery_task_lock); + obd->obd_next_recovery_transno++; + spin_unlock(&obd->obd_recovery_task_lock); + target_exp_dequeue_req_replay(req); + target_request_copy_put(req); + obd->obd_replayed_requests++; + } else if (type == UPDATE_RECOVERY && transno != 0) { + struct distribute_txn_replay_req *dtrq; + + spin_unlock(&obd->obd_recovery_task_lock); + + LASSERT(tdtd != NULL); + dtrq = distribute_txn_get_next_req(tdtd); + lu_context_enter(&thread->t_env->le_ctx); + tdtd->tdtd_replay_handler(env, tdtd, dtrq); + lu_context_exit(&thread->t_env->le_ctx); + extend_recovery_timer(obd, obd_timeout, true); + LASSERT(tdtd->tdtd_last_update_transno <= transno); + tdtd->tdtd_last_update_transno = transno; + spin_lock(&obd->obd_recovery_task_lock); + if (transno > obd->obd_next_recovery_transno) + obd->obd_next_recovery_transno = transno; + spin_unlock(&obd->obd_recovery_task_lock); + target_update_lcd(env, lut, dtrq); + dtrq_destory(dtrq); + } else { + spin_unlock(&obd->obd_recovery_task_lock); + LASSERT(list_empty(&obd->obd_req_replay_queue)); + LASSERT(atomic_read(&obd->obd_req_replay_clients) == 0); + /** evict exports failed VBR */ + class_disconnect_stale_exports(obd, exp_vbr_healthy); + break; + } + } while (1); +} + static int target_recovery_thread(void *arg) { struct lu_target *lut = arg; @@ -2017,43 +2320,28 @@ static int target_recovery_thread(void *arg) spin_unlock(&obd->obd_dev_lock); complete(&trd->trd_starting); - /* first of all, we have to know the first transno to replay */ - if (target_recovery_overseer(obd, check_for_clients, - exp_connect_healthy)) { - abort_req_replay_queue(obd); - abort_lock_replay_queue(obd); - } + /* first of all, we have to know the first transno to replay */ + if (target_recovery_overseer(lut, check_for_recovery_ready, + exp_connect_healthy)) { + abort_req_replay_queue(obd); + abort_lock_replay_queue(obd); + if (lut->lut_tdtd != NULL) + dtrq_list_destroy(lut->lut_tdtd); + } - /* next stage: replay requests */ + /* next stage: replay requests or update */ delta = jiffies; CDEBUG(D_INFO, "1: request replay stage - %d clients from t"LPU64"\n", atomic_read(&obd->obd_req_replay_clients), obd->obd_next_recovery_transno); - while ((req = target_next_replay_req(obd))) { - LASSERT(trd->trd_processing_task == current_pid()); - DEBUG_REQ(D_HA, req, "processing t"LPD64" from %s", - lustre_msg_get_transno(req->rq_reqmsg), - libcfs_nid2str(req->rq_peer.nid)); - handle_recovery_req(thread, req, - trd->trd_recovery_handler); - /** - * bz18031: increase next_recovery_transno before - * target_request_copy_put() will drop exp_rpc reference - */ - spin_lock(&obd->obd_recovery_task_lock); - obd->obd_next_recovery_transno++; - spin_unlock(&obd->obd_recovery_task_lock); - target_exp_dequeue_req_replay(req); - target_request_copy_put(req); - obd->obd_replayed_requests++; - } + replay_request_or_update(env, lut, trd, thread); /** * The second stage: replay locks */ CDEBUG(D_INFO, "2: lock replay stage - %d clients\n", atomic_read(&obd->obd_lock_replay_clients)); - while ((req = target_next_replay_lock(obd))) { + while ((req = target_next_replay_lock(lut))) { LASSERT(trd->trd_processing_task == current_pid()); DEBUG_REQ(D_HA, req, "processing lock from %s: ", libcfs_nid2str(req->rq_peer.nid)); @@ -2100,7 +2388,7 @@ static int target_recovery_thread(void *arg) libcfs_debug_dumplog(); } - target_finish_recovery(obd); + target_finish_recovery(lut); lu_context_fini(&env->le_ctx); trd->trd_processing_task = 0; @@ -2176,6 +2464,7 @@ static void target_recovery_expired(unsigned long castmeharder) void target_recovery_init(struct lu_target *lut, svc_handler_t handler) { struct obd_device *obd = lut->lut_obd; + if (obd->obd_max_recoverable_clients == 0) { /** Update server last boot epoch */ tgt_boot_epoch_update(lut); @@ -2195,7 +2484,6 @@ void target_recovery_init(struct lu_target *lut, svc_handler_t handler) } EXPORT_SYMBOL(target_recovery_init); - static int target_process_req_flags(struct obd_device *obd, struct ptlrpc_request *req) { diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 88d5800..7c7a1cb 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -276,6 +276,66 @@ struct lod_recovery_data { __u32 lrd_idx; }; + +/** + * process update recovery record + * + * Add the update recovery recode to the update recovery list in + * lod_recovery_data. Then the recovery thread (target_recovery_thread) + * will redo these updates. + * + * \param[in]env execution environment + * \param[in]llh log handle of update record + * \param[in]rec update record to be replayed + * \param[in]data update recovery data which holds the necessary + * arguments for recovery (see struct lod_recovery_data) + * + * \retval 0 if the record is processed successfully. + * \retval negative errno if the record processing fails. + */ +static int lod_process_recovery_updates(const struct lu_env *env, + struct llog_handle *llh, + struct llog_rec_hdr *rec, + void *data) +{ + struct lod_recovery_data *lrd = data; + struct llog_cookie *cookie = &lod_env_info(env)->lti_cookie; + struct lu_target *lut; + __u32 index = 0; + ENTRY; + + if (lrd->lrd_ltd == NULL) { + int rc; + + rc = lodname2mdt_index(lod2obd(lrd->lrd_lod)->obd_name, &index); + if (rc != 0) + return rc; + } else { + index = lrd->lrd_ltd->ltd_index; + } + + if (rec->lrh_len != + llog_update_record_size((struct llog_update_record *)rec)) { + CERROR("%s broken update record! index %u "DOSTID":%u : rc = %d\n", + lod2obd(lrd->lrd_lod)->obd_name, index, + POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index, -EIO); + return -EIO; + } + + cookie->lgc_lgl = llh->lgh_id; + cookie->lgc_index = rec->lrh_index; + cookie->lgc_subsys = LLOG_UPDATELOG_ORIG_CTXT; + + CDEBUG(D_HA, "%s: process recovery updates "DOSTID":%u\n", + lod2obd(lrd->lrd_lod)->obd_name, + POSTID(&llh->lgh_id.lgl_oi), rec->lrh_index); + lut = lod2lu_dev(lrd->lrd_lod)->ld_site->ls_tgt; + + return insert_update_records_to_replay_list(lut->lut_tdtd, + (struct llog_update_record *)rec, + cookie, index); +} + /** * recovery thread for update log * @@ -293,6 +353,7 @@ static int lod_sub_recovery_thread(void *arg) struct lod_device *lod = lrd->lrd_lod; struct dt_device *dt; struct ptlrpc_thread *thread = lrd->lrd_thread; + struct llog_ctxt *ctxt; struct lu_env env; int rc; ENTRY; @@ -317,7 +378,53 @@ static int lod_sub_recovery_thread(void *arg) if (rc != 0) GOTO(out, rc); - /* XXX do recovery in the following patches */ + /* Process the recovery record */ + ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, LLOG_UPDATELOG_ORIG_CTXT); + LASSERT(ctxt != NULL); + LASSERT(ctxt->loc_handle != NULL); + + rc = llog_cat_process(&env, ctxt->loc_handle, + lod_process_recovery_updates, lrd, 0, 0); + llog_ctxt_put(ctxt); + + if (rc < 0) { + CERROR("%s getting update log failed: rc = %d\n", + dt->dd_lu_dev.ld_obd->obd_name, rc); + GOTO(out, rc); + } + + CDEBUG(D_HA, "%s retrieve update log: rc = %d\n", + dt->dd_lu_dev.ld_obd->obd_name, rc); + + if (lrd->lrd_ltd == NULL) + lod->lod_child_got_update_log = 1; + else + lrd->lrd_ltd->ltd_got_update_log = 1; + + if (lod->lod_child_got_update_log) { + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lod_tgt_desc *tgt = NULL; + bool all_got_log = true; + int i; + + cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { + tgt = LTD_TGT(ltd, i); + if (!tgt->ltd_got_update_log) { + all_got_log = false; + break; + } + } + + if (all_got_log) { + struct lu_target *lut; + + lut = lod2lu_dev(lod)->ld_site->ls_tgt; + CDEBUG(D_HA, "%s got update logs from all MDTs.\n", + lut->lut_obd->obd_name); + lut->lut_tdtd->tdtd_replay_ready = 1; + wake_up(&lut->lut_obd->obd_next_transno_waitq); + } + } out: OBD_FREE_PTR(lrd); @@ -443,18 +550,21 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, struct l_wait_info lwi = { 0 }; struct lod_tgt_desc *sub_ltd = NULL; __u32 index; + __u32 master_index; int rc; ENTRY; + rc = lodname2mdt_index(lod2obd(lod)->obd_name, &master_index); + if (rc != 0) + RETURN(rc); + OBD_ALLOC_PTR(lrd); if (lrd == NULL) RETURN(-ENOMEM); if (lod->lod_child == dt) { thread = &lod->lod_child_recovery_thread; - rc = lodname2mdt_index(lod2obd(lod)->obd_name, &index); - if (rc != 0) - GOTO(free_lrd, rc); + index = master_index; } else { struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_tgt_desc *tgt = NULL; @@ -476,6 +586,8 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, thread = sub_ltd->ltd_recovery_thread; } + CDEBUG(D_INFO, "%s init sub log %s\n", lod2obd(lod)->obd_name, + dt->dd_lu_dev.ld_obd->obd_name); lrd->lrd_lod = lod; lrd->lrd_ltd = sub_ltd; lrd->lrd_thread = thread; @@ -493,8 +605,8 @@ int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod, } /* Start the recovery thread */ - task = kthread_run(lod_sub_recovery_thread, lrd, "lod_recov_%04x", - index); + task = kthread_run(lod_sub_recovery_thread, lrd, "lod%04x_rec%04x", + master_index, index); if (IS_ERR(task)) { rc = PTR_ERR(task); CERROR("%s: cannot start recovery thread: rc = %d\n", @@ -588,6 +700,12 @@ static int lod_prepare_distribute_txn(const struct lu_env *env, RETURN(rc); } + tdtd->tdtd_dt = &lod->lod_dt_dev; + INIT_LIST_HEAD(&tdtd->tdtd_replay_list); + spin_lock_init(&tdtd->tdtd_replay_list_lock); + tdtd->tdtd_replay_handler = distribute_txn_replay_handle; + tdtd->tdtd_replay_ready = 0; + lut->lut_tdtd = tdtd; RETURN(0); diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index 85c5007..60617cf 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -125,7 +125,8 @@ struct lod_tgt_desc { struct ptlrpc_thread *ltd_recovery_thread; unsigned long ltd_active:1,/* is this target up for requests */ ltd_activate:1,/* should target be activated */ - ltd_reap:1; /* should this target be deleted */ + ltd_reap:1, /* should this target be deleted */ + ltd_got_update_log:1; /* Already got update log */ }; #define TGT_PTRS 256 /* number of pointers at 1st level */ @@ -170,7 +171,8 @@ struct lod_device { int lod_connects; unsigned int lod_recovery_completed:1, lod_initialized:1, - lod_lmv_failout:1; + lod_lmv_failout:1, + lod_child_got_update_log:1; /* lov settings descriptor storing static information */ struct lov_desc lod_desc; @@ -301,6 +303,7 @@ struct lod_thread_info { struct lu_buf lti_linkea_buf; struct dt_insert_rec lti_dt_rec; struct llog_catid lti_cid; + struct llog_cookie lti_cookie; }; extern const struct lu_device_operations lod_lu_ops; @@ -360,12 +363,6 @@ static inline struct dt_object* lod_object_child(struct lod_object *o) struct dt_object, do_lu); } -static inline struct dt_object *dt_object_child(struct dt_object *o) -{ - return container_of0(lu_object_next(&(o)->do_lu), - struct dt_object, do_lu); -} - extern struct lu_context_key lod_thread_key; static inline struct lod_thread_info *lod_env_info(const struct lu_env *env) diff --git a/lustre/lod/lod_lov.c b/lustre/lod/lod_lov.c index cb33955..7054876 100644 --- a/lustre/lod/lod_lov.c +++ b/lustre/lod/lod_lov.c @@ -432,6 +432,13 @@ static void __lod_del_device(const struct lu_env *env, struct lod_device *lod, lfsck_del_target(env, lod->lod_child, LTD_TGT(ltd, idx)->ltd_tgt, idx, for_ost); + if (!for_ost && LTD_TGT(ltd, idx)->ltd_recovery_thread != NULL) { + struct ptlrpc_thread *thread; + + thread = LTD_TGT(ltd, idx)->ltd_recovery_thread; + OBD_FREE_PTR(thread); + } + if (LTD_TGT(ltd, idx)->ltd_reap == 0) { LTD_TGT(ltd, idx)->ltd_reap = 1; ltd->ltd_death_row++; diff --git a/lustre/lod/lod_sub_object.c b/lustre/lod/lod_sub_object.c index 05eb8a4..f1df917 100644 --- a/lustre/lod/lod_sub_object.c +++ b/lustre/lod/lod_sub_object.c @@ -944,8 +944,8 @@ out_close: llog_cat_close(env, ctxt->loc_handle); ctxt->loc_handle = NULL; } + out_put: llog_ctxt_put(ctxt); RETURN(rc); } - diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 1a453ff..a2cc964 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -4721,6 +4721,7 @@ static int mdt_prepare(const struct lu_env *env, } LASSERT(!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state)); + target_recovery_init(&mdt->mdt_lut, tgt_request_handle); set_bit(MDT_FL_CFGLOG, &mdt->mdt_state); LASSERT(obd->obd_no_conn); @@ -5575,6 +5576,7 @@ static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len, break; case OBD_IOC_ABORT_RECOVERY: CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt)); + obd->obd_force_abort_recovery = 1; target_stop_recovery_thread(obd); rc = 0; break; diff --git a/lustre/mgs/mgs_internal.h b/lustre/mgs/mgs_internal.h index 781cd2c..bf38873 100644 --- a/lustre/mgs/mgs_internal.h +++ b/lustre/mgs/mgs_internal.h @@ -370,11 +370,6 @@ static inline struct dt_object* mgs_object_child(struct mgs_object *o) struct dt_object, do_lu); } -static inline struct dt_object *dt_object_child(struct dt_object *o) -{ - return container_of0(lu_object_next(&(o)->do_lu), - struct dt_object, do_lu); -} struct mgs_direntry { struct list_head mde_list; char *mde_name; diff --git a/lustre/obdclass/llog_swab.c b/lustre/obdclass/llog_swab.c index 822e6d5..366dc19 100644 --- a/lustre/obdclass/llog_swab.c +++ b/lustre/obdclass/llog_swab.c @@ -44,6 +44,7 @@ #include +#include static void print_llogd_body(struct llogd_body *d) { @@ -127,6 +128,21 @@ void lustre_swab_lu_seq_range(struct lu_seq_range *range) } EXPORT_SYMBOL(lustre_swab_lu_seq_range); +void lustre_swab_update_ops(struct update_ops *uops, unsigned int op_count) +{ + unsigned int i; + unsigned int j; + + for (i = 0; i < op_count; i++) { + lustre_swab_lu_fid(&uops->uops_op[i].uop_fid); + __swab16s(&uops->uops_op[i].uop_type); + __swab16s(&uops->uops_op[i].uop_param_count); + for (j = 0; j < uops->uops_op[i].uop_param_count; j++) + __swab16s(&uops->uops_op[i].uop_params_off[j]); + } +} +EXPORT_SYMBOL(lustre_swab_update_ops); + void lustre_swab_llog_rec(struct llog_rec_hdr *rec) { struct llog_rec_tail *tail = NULL; @@ -272,6 +288,25 @@ void lustre_swab_llog_rec(struct llog_rec_hdr *rec) } case LLOG_PAD_MAGIC: break; + case UPDATE_REC: + { + struct llog_update_record *lur = + (struct llog_update_record *)rec; + struct update_records *record = &lur->lur_update_rec; + + __swab32s(&record->ur_flags); + __swab64s(&record->ur_batchid); + __swab64s(&record->ur_master_transno); + __swab32s(&record->ur_param_count); + __swab32s(&record->ur_update_count); + lustre_swab_update_ops(&record->ur_ops, + record->ur_update_count); + + /* Compute tail location. */ + tail = (struct llog_rec_tail *)((char *)record + + update_records_size(record)); + break; + } default: CERROR("Unknown llog rec type %#x swabbing rec %p\n", rec->lrh_type, rec); diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index da99fb0..2bb97a7 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -1300,6 +1300,7 @@ static int ofd_iocontrol(unsigned int cmd, struct obd_export *exp, int len, switch (cmd) { case OBD_IOC_ABORT_RECOVERY: CERROR("%s: aborting recovery\n", obd->obd_name); + obd->obd_force_abort_recovery = 1; target_stop_recovery_thread(obd); break; case OBD_IOC_SYNC: diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 6c3ad4f..73efec3 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -850,6 +850,27 @@ out: } /** + * Determine if the lock needs to be cancelled + * + * Determine if the unused lock should be cancelled before replay, see + * (ldlm_cancel_no_wait_policy()). Currently, only inode bits lock exists + * between MDTs. + * + * \param[in] lock lock to be checked. + * + * \retval 1 if the lock needs to be cancelled before replay. + * \retval 0 if the lock does not need to be cancelled before + * replay. + */ +static int osp_cancel_weight(struct ldlm_lock *lock) +{ + if (lock->l_resource->lr_type != LDLM_IBITS) + RETURN(0); + + RETURN(1); +} + +/** * Initialize OSP device according to the parameters in the configuration * log \a cfg. * @@ -1063,6 +1084,8 @@ static int osp_init0(const struct lu_env *env, struct osp_device *osp, GOTO(out_precreat, rc); } + ns_register_cancel(obd->obd_namespace, osp_cancel_weight); + /* * Initiate connect to OST */ @@ -1316,6 +1339,9 @@ static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp, if (rc) { CERROR("%s: can't connect obd: rc = %d\n", obd->obd_name, rc); GOTO(out, rc); + } else { + osp->opd_obd->u.cli.cl_seq->lcs_exp = + class_export_get(osp->opd_exp); } ptlrpc_pinger_add_import(imp); @@ -1433,26 +1459,6 @@ out: } /** - * Prepare fid client. - * - * This function prepares the FID client for the OSP. It will check and assign - * the export (to MDT0) for its FID client, so OSP can allocate super sequence - * or lookup sequence in FLDB of MDT0. - * - * \param[in] osp OSP device - */ -static void osp_prepare_fid_client(struct osp_device *osp) -{ - LASSERT(osp->opd_obd->u.cli.cl_seq != NULL); - if (osp->opd_obd->u.cli.cl_seq->lcs_exp != NULL) - return; - - LASSERT(osp->opd_exp != NULL); - osp->opd_obd->u.cli.cl_seq->lcs_exp = - class_export_get(osp->opd_exp); -} - -/** * Implementation of obd_ops::o_import_event * * This function is called when some related import event happens. It will @@ -1500,7 +1506,6 @@ static int osp_import_event(struct obd_device *obd, struct obd_import *imp, case IMP_EVENT_ACTIVE: d->opd_imp_active = 1; - osp_prepare_fid_client(d); if (d->opd_got_disconnected) d->opd_new_connection = 1; d->opd_imp_connected = 1; @@ -1665,8 +1670,7 @@ static int osp_fid_alloc(const struct lu_env *env, struct obd_export *exp, LASSERT(osp->opd_obd->u.cli.cl_seq != NULL); /* Sigh, fid client is not ready yet */ - if (osp->opd_obd->u.cli.cl_seq->lcs_exp == NULL) - RETURN(-ENOTCONN); + LASSERT(osp->opd_obd->u.cli.cl_seq->lcs_exp != NULL); RETURN(seq_client_alloc_fid(env, seq, fid)); } diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index a3aa8d3..b6e5cf4 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -791,7 +791,8 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, args = ptlrpc_req_async_args(req); args->oaua_update = dt_update; - if (!th->th_wait_submit && is_only_remote_trans(th) && !th->th_sync) { + if (is_only_remote_trans(th) && !th->th_sync && + !th->th_wait_submit) { args->oaua_flow_control = true; if (!osp->opd_connect_mdt) { @@ -959,7 +960,8 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, dt_update->dur_buf.ub_req->ourq_count == 0) GOTO(out, rc); - if (is_only_remote_trans(th) && !th->th_sync) { + if (is_only_remote_trans(th) && !th->th_sync && + !th->th_wait_submit) { struct osp_device *osp = dt2osp_dev(th->th_dev); struct client_obd *cli = &osp->opd_obd->u.cli; diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 40f4914..37c86cf 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -13,6 +13,7 @@ target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o target_objs += $(TARGET)out_lib.o $(TARGET)update_trans.o target_objs += $(TARGET)update_records.o +target_objs += $(TARGET)update_recovery.o ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o diff --git a/lustre/target/Makefile.am b/lustre/target/Makefile.am index 924e3c0..23ad9a8 100644 --- a/lustre/target/Makefile.am +++ b/lustre/target/Makefile.am @@ -35,3 +35,4 @@ EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \ out_handler.c out_lib.c EXTRA_DIST += update_trans.c EXTRA_DIST += update_records.c +EXTRA_DIST += update_recovery.c diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index 635ea99..11dc864 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -36,77 +36,6 @@ #include "tgt_internal.h" #include -static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta) -{ - struct tx_arg **new_ta; - int i; - int rc = 0; - - if (ta->ta_alloc_args >= new_alloc_ta) - return 0; - - OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta); - if (new_ta == NULL) - return -ENOMEM; - - for (i = 0; i < new_alloc_ta; i++) { - if (i < ta->ta_alloc_args) { - /* copy the old args to new one */ - new_ta[i] = ta->ta_args[i]; - } else { - OBD_ALLOC_PTR(new_ta[i]); - if (new_ta[i] == NULL) - GOTO(out, rc = -ENOMEM); - } - } - - /* free the old args */ - if (ta->ta_args != NULL) - OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) * - ta->ta_alloc_args); - - ta->ta_args = new_ta; - ta->ta_alloc_args = new_alloc_ta; -out: - if (rc != 0) { - for (i = 0; i < new_alloc_ta; i++) { - if (new_ta[i] != NULL) - OBD_FREE_PTR(new_ta[i]); - } - OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta); - } - return rc; -} - -#define TX_ALLOC_STEP 8 -static struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, - tx_exec_func_t func, tx_exec_func_t undo, - const char *file, int line) -{ - int rc; - int i; - - LASSERT(ta != NULL); - LASSERT(func != NULL); - - if (ta->ta_argno + 1 >= ta->ta_alloc_args) { - rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP); - if (rc != 0) - return ERR_PTR(rc); - } - - i = ta->ta_argno; - - ta->ta_argno++; - - ta->ta_args[i]->exec_fn = func; - ta->ta_args[i]->undo_fn = undo; - ta->ta_args[i]->file = file; - ta->ta_args[i]->line = line; - - return ta->ta_args[i]; -} - static void out_reconstruct(const struct lu_env *env, struct dt_device *dt, struct dt_object *obj, struct object_update_reply *reply, @@ -137,6 +66,22 @@ static inline int out_check_resent(const struct lu_env *env, return 0; if (req_xid_is_last(req)) { + struct lsd_client_data *lcd; + + /* XXX this does not support mulitple transactions yet, i.e. + * only 1 update RPC each time betwee MDTs */ + lcd = req->rq_export->exp_target_data.ted_lcd; + + req->rq_transno = lcd->lcd_last_transno; + req->rq_status = lcd->lcd_last_result; + if (req->rq_status != 0) + req->rq_transno = 0; + lustre_msg_set_transno(req->rq_repmsg, req->rq_transno); + lustre_msg_set_status(req->rq_repmsg, req->rq_status); + + DEBUG_REQ(D_RPCTRACE, req, "restoring transno "LPD64"status %d", + req->rq_transno, req->rq_status); + reconstruct(env, dt, obj, reply, index); return 1; } @@ -145,101 +90,6 @@ static inline int out_check_resent(const struct lu_env *env, return 0; } -static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj, - struct thandle *th) -{ - int rc; - - CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev), - PFID(lu_object_fid(&dt_obj->do_lu))); - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_destroy(env, dt_obj, th); - dt_write_unlock(env, dt_obj); - - return rc; -} - -/** - * All of the xxx_undo will be used once execution failed, - * But because all of the required resource has been reserved in - * declare phase, i.e. if declare succeed, it should make sure - * the following executing phase succeed in anyway, so these undo - * should be useless for most of the time in Phase I - */ -static int out_tx_create_undo(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - int rc; - - rc = out_obj_destroy(env, arg->object, th); - if (rc != 0) - CERROR("%s: undo failure, we are doomed!: rc = %d\n", - dt_obd_name(th->th_dev), rc); - return rc; -} - -static int out_tx_create_exec(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n", - dt_obd_name(th->th_dev), - PFID(lu_object_fid(&arg->object->do_lu)), - arg->u.create.dof.dof_type, - arg->u.create.attr.la_mode & S_IFMT); - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_create(env, dt_obj, &arg->u.create.attr, - &arg->u.create.hint, &arg->u.create.dof, th); - - dt_write_unlock(env, dt_obj); - - CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, rc); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - return rc; -} - -static int __out_tx_create(const struct lu_env *env, struct dt_object *obj, - struct lu_attr *attr, struct lu_fid *parent_fid, - struct dt_object_format *dof, - struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - LASSERT(ta->ta_handle != NULL); - rc = dt_declare_create(env, obj, attr, NULL, dof, - ta->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file, - line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - /* release the object in out_trans_stop */ - lu_object_get(&obj->do_lu); - arg->object = obj; - arg->u.create.attr = *attr; - if (parent_fid != NULL) - arg->u.create.fid = *parent_fid; - memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint)); - arg->u.create.dof = *dof; - arg->reply = reply; - arg->index = index; - - return 0; -} - static int out_create(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -288,72 +138,13 @@ static int out_create(struct tgt_session_info *tsi) RETURN(-EEXIST); rc = out_tx_create(tsi->tsi_env, obj, attr, fid, dof, - &tti->tti_tea, + &tti->tti_tea, tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); } -static int out_tx_attr_set_undo(const struct lu_env *env, - struct thandle *th, struct tx_arg *arg) -{ - CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n", - dt_obd_name(th->th_dev), - PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP); - - return -ENOTSUPP; -} - -static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev), - PFID(lu_object_fid(&dt_obj->do_lu))); - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th); - dt_write_unlock(env, dt_obj); - - CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, rc); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - return rc; -} - -static int __out_tx_attr_set(const struct lu_env *env, - struct dt_object *dt_obj, - const struct lu_attr *attr, - struct thandle_exec_args *th, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - LASSERT(th->ta_handle != NULL); - rc = dt_declare_attr_set(env, dt_obj, attr, th->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(th, out_tx_attr_set_exec, out_tx_attr_set_undo, - file, line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->u.attr_set.attr = *attr; - arg->reply = reply; - arg->index = index; - return 0; -} - static int out_attr_set(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -383,6 +174,7 @@ static int out_attr_set(struct tgt_session_info *tsi) la_from_obdo(attr, lobdo, lobdo->o_valid); rc = out_tx_attr_set(tsi->tsi_env, obj, attr, &tti->tti_tea, + tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); @@ -543,105 +335,6 @@ out_unlock: RETURN(rc); } -static int out_tx_xattr_set_exec(const struct lu_env *env, - struct thandle *th, - struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n", - dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf, - arg->u.xattr_set.name, arg->u.xattr_set.flags); - - if (!lu_object_exists(&dt_obj->do_lu)) - GOTO(out, rc = -ENOENT); - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf, - arg->u.xattr_set.name, arg->u.xattr_set.flags, th); - /** - * Ignore errors if this is LINK EA - **/ - if (unlikely(rc != 0 && - strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) { - /* XXX: If the linkEA is overflow, then we need to notify the - * namespace LFSCK to skip "nlink" attribute verification - * on this object to avoid the "nlink" to be shrinked by - * wrong. It may be not good an interaction with LFSCK - * like this. We will consider to replace it with other - * mechanism in future. LU-5802. */ - if (rc == -ENOSPC) { - struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; - - lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), - LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE); - tgt_lfsck_in_notify(env, - tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th); - } - - rc = 0; - } - dt_write_unlock(env, dt_obj); - -out: - CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, rc); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - return rc; -} - -static int __out_tx_xattr_set(const struct lu_env *env, - struct dt_object *dt_obj, - const struct lu_buf *buf, - const char *name, int flags, - struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - LASSERT(ta->ta_handle != NULL); - rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, ta->ta_handle); - if (rc != 0) - return rc; - - if (strcmp(name, XATTR_NAME_LINK) == 0) { - struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; - - /* XXX: If the linkEA is overflow, then we need to notify the - * namespace LFSCK to skip "nlink" attribute verification - * on this object to avoid the "nlink" to be shrinked by - * wrong. It may be not good an interaction with LFSCK - * like this. We will consider to replace it with other - * mechanism in future. LU-5802. */ - lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), - LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE); - rc = tgt_lfsck_in_notify(env, - tgt_ses_info(env)->tsi_tgt->lut_bottom, - lr, ta->ta_handle); - if (rc != 0) - return rc; - } - - arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->u.xattr_set.name = name; - arg->u.xattr_set.flags = flags; - arg->u.xattr_set.buf = *buf; - arg->reply = reply; - arg->index = index; - arg->u.xattr_set.csum = 0; - return 0; -} - static int out_xattr_set(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -683,62 +376,12 @@ static int out_xattr_set(struct tgt_session_info *tsi) flag = *tmp; rc = out_tx_xattr_set(tsi->tsi_env, obj, lbuf, name, flag, - &tti->tti_tea, + &tti->tti_tea, tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); } -static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n", - dt_obd_name(th->th_dev), arg->u.xattr_set.name, - PFID(lu_object_fid(&dt_obj->do_lu))); - - if (!lu_object_exists(&dt_obj->do_lu)) - GOTO(out, rc = -ENOENT); - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name, th); - dt_write_unlock(env, dt_obj); -out: - CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, rc); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - return rc; -} - -static int __out_tx_xattr_del(const struct lu_env *env, - struct dt_object *dt_obj, const char *name, - struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - rc = dt_declare_xattr_del(env, dt_obj, name, ta->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->u.xattr_set.name = name; - arg->reply = reply; - arg->index = index; - return 0; -} - static int out_xattr_del(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -756,84 +399,12 @@ static int out_xattr_del(struct tgt_session_info *tsi) } rc = out_tx_xattr_del(tsi->tsi_env, obj, name, &tti->tti_tea, + tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); } -static int out_obj_ref_add(const struct lu_env *env, - struct dt_object *dt_obj, - struct thandle *th) -{ - int rc; - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_ref_add(env, dt_obj, th); - dt_write_unlock(env, dt_obj); - - return rc; -} - -static int out_obj_ref_del(const struct lu_env *env, - struct dt_object *dt_obj, - struct thandle *th) -{ - int rc; - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_ref_del(env, dt_obj, th); - dt_write_unlock(env, dt_obj); - - return rc; -} - -static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - rc = out_obj_ref_add(env, dt_obj, th); - - CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, rc); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - return rc; -} - -static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - return out_obj_ref_del(env, arg->object, th); -} - -static int __out_tx_ref_add(const struct lu_env *env, - struct dt_object *dt_obj, - struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - LASSERT(ta->ta_handle != NULL); - rc = dt_declare_ref_add(env, dt_obj, ta->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file, - line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->reply = reply; - arg->index = index; - return 0; -} - /** * increase ref of the object **/ @@ -846,59 +417,12 @@ static int out_ref_add(struct tgt_session_info *tsi) ENTRY; rc = out_tx_ref_add(tsi->tsi_env, obj, &tti->tti_tea, + tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); } -static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - rc = out_obj_ref_del(env, dt_obj, th); - - CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, 0); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - return rc; -} - -static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - return out_obj_ref_add(env, arg->object, th); -} - -static int __out_tx_ref_del(const struct lu_env *env, - struct dt_object *dt_obj, - struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - LASSERT(ta->ta_handle != NULL); - rc = dt_declare_ref_del(env, dt_obj, ta->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file, - line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->reply = reply; - arg->index = index; - return 0; -} - static int out_ref_del(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -911,118 +435,12 @@ static int out_ref_del(struct tgt_session_info *tsi) RETURN(-ENOENT); rc = out_tx_ref_del(tsi->tsi_env, obj, &tti->tti_tea, + tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); } -static int out_obj_index_insert(const struct lu_env *env, - struct dt_object *dt_obj, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th) -{ - int rc; - - CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID", type %u\n", - dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)), - (char *)key, PFID(((struct dt_insert_rec *)rec)->rec_fid), - ((struct dt_insert_rec *)rec)->rec_type); - - if (dt_try_as_dir(env, dt_obj) == 0) - return -ENOTDIR; - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_insert(env, dt_obj, rec, key, th, 0); - dt_write_unlock(env, dt_obj); - - return rc; -} - -static int out_obj_index_delete(const struct lu_env *env, - struct dt_object *dt_obj, - const struct dt_key *key, - struct thandle *th) -{ - int rc; - - CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n", - dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)), - (char *)key); - - if (dt_try_as_dir(env, dt_obj) == 0) - return -ENOTDIR; - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_delete(env, dt_obj, key, th); - dt_write_unlock(env, dt_obj); - - return rc; -} - -static int out_tx_index_insert_exec(const struct lu_env *env, - struct thandle *th, struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - if (unlikely(!dt_object_exists(dt_obj))) - RETURN(-ESTALE); - - rc = out_obj_index_insert(env, dt_obj, - (const struct dt_rec *)&arg->u.insert.rec, - arg->u.insert.key, th); - - CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, rc); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - return rc; -} - -static int out_tx_index_insert_undo(const struct lu_env *env, - struct thandle *th, struct tx_arg *arg) -{ - return out_obj_index_delete(env, arg->object, arg->u.insert.key, th); -} - -static int __out_tx_index_insert(const struct lu_env *env, - struct dt_object *dt_obj, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - LASSERT(ta->ta_handle != NULL); - if (dt_try_as_dir(env, dt_obj) == 0) { - rc = -ENOTDIR; - return rc; - } - - rc = dt_declare_insert(env, dt_obj, rec, key, ta->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(ta, out_tx_index_insert_exec, - out_tx_index_insert_undo, file, line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->reply = reply; - arg->index = index; - arg->u.insert.rec = *(const struct dt_insert_rec *)rec; - arg->u.insert.key = key; - - return 0; -} - static int out_index_insert(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -1074,69 +492,12 @@ static int out_index_insert(struct tgt_session_info *tsi) rc = out_tx_index_insert(tsi->tsi_env, obj, (const struct dt_rec *)rec, (const struct dt_key *)name, &tti->tti_tea, + tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); } -static int out_tx_index_delete_exec(const struct lu_env *env, - struct thandle *th, - struct tx_arg *arg) -{ - int rc; - - rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th); - - CDEBUG(D_INFO, "%s: delete idx insert reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, rc); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - return rc; -} - -static int out_tx_index_delete_undo(const struct lu_env *env, - struct thandle *th, - struct tx_arg *arg) -{ - CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n", - dt_obd_name(th->th_dev), -ENOTSUPP); - return -ENOTSUPP; -} - -static int __out_tx_index_delete(const struct lu_env *env, - struct dt_object *dt_obj, - const struct dt_key *key, - struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - if (dt_try_as_dir(env, dt_obj) == 0) { - rc = -ENOTDIR; - return rc; - } - - LASSERT(ta->ta_handle != NULL); - rc = dt_declare_delete(env, dt_obj, key, ta->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(ta, out_tx_index_delete_exec, - out_tx_index_delete_undo, file, line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->reply = reply; - arg->index = index; - arg->u.insert.key = key; - return 0; -} - static int out_index_delete(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -1156,61 +517,12 @@ static int out_index_delete(struct tgt_session_info *tsi) } rc = out_tx_index_delete(tsi->tsi_env, obj, (const struct dt_key *)name, - &tti->tti_tea, + &tti->tti_tea, tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); } -static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - rc = out_obj_destroy(env, dt_obj, th); - - CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n", - dt_obd_name(th->th_dev), arg->reply, arg->index, rc); - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - RETURN(rc); -} - -static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - CERROR("%s: not support destroy undo yet!: rc = %d\n", - dt_obd_name(th->th_dev), -ENOTSUPP); - return -ENOTSUPP; -} - -static int __out_tx_destroy(const struct lu_env *env, struct dt_object *dt_obj, - struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - LASSERT(ta->ta_handle != NULL); - rc = dt_declare_destroy(env, dt_obj, ta->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo, - file, line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->reply = reply; - arg->index = index; - return 0; -} - static int out_destroy(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -1231,63 +543,13 @@ static int out_destroy(struct tgt_session_info *tsi) RETURN(-ENOENT); rc = out_tx_destroy(tsi->tsi_env, obj, &tti->tti_tea, + tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); } -static int out_tx_write_exec(const struct lu_env *env, struct thandle *th, - struct tx_arg *arg) -{ - struct dt_object *dt_obj = arg->object; - int rc; - - CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n", - PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos, - arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len); - - dt_write_lock(env, dt_obj, MOR_TGT_CHILD); - rc = dt_record_write(env, dt_obj, &arg->u.write.buf, - &arg->u.write.pos, th); - dt_write_unlock(env, dt_obj); - - if (rc == 0) - rc = arg->u.write.buf.lb_len; - - object_update_result_insert(arg->reply, NULL, 0, arg->index, rc); - - return rc > 0 ? 0 : rc; -} - -static int __out_tx_write(const struct lu_env *env, - struct dt_object *dt_obj, - const struct lu_buf *buf, - loff_t pos, struct thandle_exec_args *ta, - struct object_update_reply *reply, - int index, const char *file, int line) -{ - struct tx_arg *arg; - int rc; - - LASSERT(ta->ta_handle != NULL); - rc = dt_declare_record_write(env, dt_obj, buf, pos, ta->ta_handle); - if (rc != 0) - return rc; - - arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line); - if (IS_ERR(arg)) - return PTR_ERR(arg); - - lu_object_get(&dt_obj->do_lu); - arg->object = dt_obj; - arg->u.write.buf = *buf; - arg->u.write.pos = pos; - arg->reply = reply; - arg->index = index; - return 0; -} - static int out_write(struct tgt_session_info *tsi) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); @@ -1323,7 +585,7 @@ static int out_write(struct tgt_session_info *tsi) pos = *tmp; rc = out_tx_write(tsi->tsi_env, obj, lbuf, pos, - &tti->tti_tea, + &tti->tti_tea, tti->tti_tea.ta_handle, tti->tti_u.update.tti_update_reply, tti->tti_u.update.tti_update_reply_index); RETURN(rc); diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index 89948cc..0c2c95b 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -32,9 +32,11 @@ #define DEBUG_SUBSYSTEM S_CLASS #include +#include #include #include #include +#include "tgt_internal.h" #define OUT_UPDATE_BUFFER_SIZE_ADD 4096 #define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */ @@ -401,3 +403,799 @@ int out_read_pack(const struct lu_env *env, struct object_update *update, ARRAY_SIZE(sizes), sizes, bufs); } EXPORT_SYMBOL(out_read_pack); + +static int tx_extend_args(struct thandle_exec_args *ta, int new_alloc_ta) +{ + struct tx_arg **new_ta; + int i; + int rc = 0; + + if (ta->ta_alloc_args >= new_alloc_ta) + return 0; + + OBD_ALLOC(new_ta, sizeof(*new_ta) * new_alloc_ta); + if (new_ta == NULL) + return -ENOMEM; + + for (i = 0; i < new_alloc_ta; i++) { + if (i < ta->ta_alloc_args) { + /* copy the old args to new one */ + new_ta[i] = ta->ta_args[i]; + } else { + OBD_ALLOC_PTR(new_ta[i]); + if (new_ta[i] == NULL) + GOTO(out, rc = -ENOMEM); + } + } + + /* free the old args */ + if (ta->ta_args != NULL) + OBD_FREE(ta->ta_args, sizeof(ta->ta_args[0]) * + ta->ta_alloc_args); + + ta->ta_args = new_ta; + ta->ta_alloc_args = new_alloc_ta; +out: + if (rc != 0) { + for (i = 0; i < new_alloc_ta; i++) { + if (new_ta[i] != NULL) + OBD_FREE_PTR(new_ta[i]); + } + OBD_FREE(new_ta, sizeof(*new_ta) * new_alloc_ta); + } + return rc; +} + +#define TX_ALLOC_STEP 8 +struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, + tx_exec_func_t func, tx_exec_func_t undo, + const char *file, int line) +{ + int rc; + int i; + + LASSERT(ta != NULL); + LASSERT(func != NULL); + + if (ta->ta_argno + 1 >= ta->ta_alloc_args) { + rc = tx_extend_args(ta, ta->ta_alloc_args + TX_ALLOC_STEP); + if (rc != 0) + return ERR_PTR(rc); + } + + i = ta->ta_argno; + + ta->ta_argno++; + + ta->ta_args[i]->exec_fn = func; + ta->ta_args[i]->undo_fn = undo; + ta->ta_args[i]->file = file; + ta->ta_args[i]->line = line; + + return ta->ta_args[i]; +} + +static int out_obj_destroy(const struct lu_env *env, struct dt_object *dt_obj, + struct thandle *th) +{ + int rc; + + CDEBUG(D_INFO, "%s: destroy "DFID"\n", dt_obd_name(th->th_dev), + PFID(lu_object_fid(&dt_obj->do_lu))); + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_destroy(env, dt_obj, th); + dt_write_unlock(env, dt_obj); + + return rc; +} + +/** + * All of the xxx_undo will be used once execution failed, + * But because all of the required resource has been reserved in + * declare phase, i.e. if declare succeed, it should make sure + * the following executing phase succeed in anyway, so these undo + * should be useless for most of the time in Phase I + */ +static int out_tx_create_undo(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + int rc; + + rc = out_obj_destroy(env, arg->object, th); + if (rc != 0) + CERROR("%s: undo failure, we are doomed!: rc = %d\n", + dt_obd_name(th->th_dev), rc); + return rc; +} + +int out_tx_create_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + CDEBUG(D_OTHER, "%s: create "DFID": dof %u, mode %o\n", + dt_obd_name(th->th_dev), + PFID(lu_object_fid(&arg->object->do_lu)), + arg->u.create.dof.dof_type, + arg->u.create.attr.la_mode & S_IFMT); + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_create(env, dt_obj, &arg->u.create.attr, + &arg->u.create.hint, &arg->u.create.dof, th); + + dt_write_unlock(env, dt_obj); + + CDEBUG(D_INFO, "%s: insert create reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + + return rc; +} + +/** + * Add create update to thandle + * + * Declare create updates and add the update to the thandle updates + * exec array. + * + * \param [in] env execution environment + * \param [in] obj object to be created + * \param [in] attr attributes of the creation + * \param [in] parent_fid the fid of the parent + * \param [in] dof dt object format of the creation + * \param [in] ta thandle execuation args where all of updates + * of the transaction are stored + * \param [in] th thandle for this update + * \param [in] reply reply of the updates + * \param [in] index index of the reply + * \param [in] file the file name where the function is called, + * which is only for debugging purpose. + * \param [in] line the line number where the funtion is called, + * which is only for debugging purpose. + * + * \retval 0 if updates is added successfully. + * \retval negative errno if update adding fails. + */ +int out_create_add_exec(const struct lu_env *env, struct dt_object *obj, + struct lu_attr *attr, struct lu_fid *parent_fid, + struct dt_object_format *dof, + struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_create(env, obj, attr, NULL, dof, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_create_exec, out_tx_create_undo, file, + line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + /* release the object in out_trans_stop */ + lu_object_get(&obj->do_lu); + arg->object = obj; + arg->u.create.attr = *attr; + if (parent_fid != NULL) + arg->u.create.fid = *parent_fid; + memset(&arg->u.create.hint, 0, sizeof(arg->u.create.hint)); + arg->u.create.dof = *dof; + arg->reply = reply; + arg->index = index; + + return 0; +} + +static int out_tx_attr_set_undo(const struct lu_env *env, + struct thandle *th, struct tx_arg *arg) +{ + CERROR("%s: attr set undo "DFID" unimplemented yet!: rc = %d\n", + dt_obd_name(th->th_dev), + PFID(lu_object_fid(&arg->object->do_lu)), -ENOTSUPP); + + return -ENOTSUPP; +} + +static int out_tx_attr_set_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + CDEBUG(D_OTHER, "%s: attr set "DFID"\n", dt_obd_name(th->th_dev), + PFID(lu_object_fid(&dt_obj->do_lu))); + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_attr_set(env, dt_obj, &arg->u.attr_set.attr, th); + dt_write_unlock(env, dt_obj); + + CDEBUG(D_INFO, "%s: insert attr_set reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, + arg->index, rc); + + return rc; +} + +int out_attr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + const struct lu_attr *attr, + struct thandle_exec_args *ta, + struct thandle *th, struct object_update_reply *reply, + int index, const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_attr_set(env, dt_obj, attr, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_attr_set_exec, out_tx_attr_set_undo, + file, line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->u.attr_set.attr = *attr; + arg->reply = reply; + arg->index = index; + return 0; +} + +static int out_tx_write_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + CDEBUG(D_INFO, "write "DFID" pos "LPU64" buf %p, len %lu\n", + PFID(lu_object_fid(&dt_obj->do_lu)), arg->u.write.pos, + arg->u.write.buf.lb_buf, (unsigned long)arg->u.write.buf.lb_len); + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_record_write(env, dt_obj, &arg->u.write.buf, + &arg->u.write.pos, th); + dt_write_unlock(env, dt_obj); + + if (rc == 0) + rc = arg->u.write.buf.lb_len; + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + + return rc > 0 ? 0 : rc; +} + +int out_write_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + const struct lu_buf *buf, loff_t pos, + struct thandle_exec_args *ta, struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_record_write(env, dt_obj, buf, pos, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_write_exec, NULL, file, line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->u.write.buf = *buf; + arg->u.write.pos = pos; + arg->reply = reply; + arg->index = index; + return 0; +} + +static int out_tx_xattr_set_exec(const struct lu_env *env, + struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + CDEBUG(D_INFO, "%s: set xattr buf %p name %s flag %d\n", + dt_obd_name(th->th_dev), arg->u.xattr_set.buf.lb_buf, + arg->u.xattr_set.name, arg->u.xattr_set.flags); + + if (!lu_object_exists(&dt_obj->do_lu)) + GOTO(out, rc = -ENOENT); + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_xattr_set(env, dt_obj, &arg->u.xattr_set.buf, + arg->u.xattr_set.name, arg->u.xattr_set.flags, + th); + /** + * Ignore errors if this is LINK EA + **/ + if (unlikely(rc != 0 && + strcmp(arg->u.xattr_set.name, XATTR_NAME_LINK) == 0)) { + /* XXX: If the linkEA is overflow, then we need to notify the + * namespace LFSCK to skip "nlink" attribute verification + * on this object to avoid the "nlink" to be shrinked by + * wrong. It may be not good an interaction with LFSCK + * like this. We will consider to replace it with other + * mechanism in future. LU-5802. */ + if (rc == -ENOSPC && arg->reply != NULL) { + struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; + + lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), + LE_SKIP_NLINK, LFSCK_TYPE_NAMESPACE); + tgt_lfsck_in_notify(env, + tgt_ses_info(env)->tsi_tgt->lut_bottom, lr, th); + } + + rc = 0; + } + dt_write_unlock(env, dt_obj); + +out: + CDEBUG(D_INFO, "%s: insert xattr set reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + + return rc; +} + +int out_xattr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + const struct lu_buf *buf, const char *name, + int flags, struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_xattr_set(env, dt_obj, buf, name, flags, th); + if (rc != 0) + return rc; + + if (strcmp(name, XATTR_NAME_LINK) == 0 && reply != NULL) { + struct lfsck_request *lr = &tgt_th_info(env)->tti_lr; + + /* XXX: If the linkEA is overflow, then we need to notify the + * namespace LFSCK to skip "nlink" attribute verification + * on this object to avoid the "nlink" to be shrinked by + * wrong. It may be not good an interaction with LFSCK + * like this. We will consider to replace it with other + * mechanism in future. LU-5802. */ + lfsck_pack_rfa(lr, lu_object_fid(&dt_obj->do_lu), + LE_SKIP_NLINK_DECLARE, LFSCK_TYPE_NAMESPACE); + rc = tgt_lfsck_in_notify(env, + tgt_ses_info(env)->tsi_tgt->lut_bottom, + lr, ta->ta_handle); + if (rc != 0) + return rc; + } + + arg = tx_add_exec(ta, out_tx_xattr_set_exec, NULL, file, line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->u.xattr_set.name = name; + arg->u.xattr_set.flags = flags; + arg->u.xattr_set.buf = *buf; + arg->reply = reply; + arg->index = index; + arg->u.xattr_set.csum = 0; + return 0; +} + +static int out_tx_xattr_del_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + CDEBUG(D_INFO, "%s: del xattr name '%s' on "DFID"\n", + dt_obd_name(th->th_dev), arg->u.xattr_set.name, + PFID(lu_object_fid(&dt_obj->do_lu))); + + if (!lu_object_exists(&dt_obj->do_lu)) + GOTO(out, rc = -ENOENT); + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_xattr_del(env, dt_obj, arg->u.xattr_set.name, + th); + dt_write_unlock(env, dt_obj); +out: + CDEBUG(D_INFO, "%s: insert xattr del reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + + return rc; +} + +int out_xattr_del_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + const char *name, struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_xattr_del(env, dt_obj, name, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_xattr_del_exec, NULL, file, line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->u.xattr_set.name = name; + arg->reply = reply; + arg->index = index; + return 0; +} + +static int out_obj_ref_add(const struct lu_env *env, + struct dt_object *dt_obj, + struct thandle *th) +{ + int rc; + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_ref_add(env, dt_obj, th); + dt_write_unlock(env, dt_obj); + + return rc; +} + +static int out_obj_ref_del(const struct lu_env *env, + struct dt_object *dt_obj, + struct thandle *th) +{ + int rc; + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_ref_del(env, dt_obj, th); + dt_write_unlock(env, dt_obj); + + return rc; +} + +static int out_tx_ref_add_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + rc = out_obj_ref_add(env, dt_obj, th); + + CDEBUG(D_INFO, "%s: insert ref_add reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + return rc; +} + +static int out_tx_ref_add_undo(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + return out_obj_ref_del(env, arg->object, th); +} + +int out_ref_add_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_ref_add(env, dt_obj, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_ref_add_exec, out_tx_ref_add_undo, file, + line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + return 0; +} + +static int out_tx_ref_del_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + rc = out_obj_ref_del(env, dt_obj, th); + + CDEBUG(D_INFO, "%s: insert ref_del reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, 0); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + + return rc; +} + +static int out_tx_ref_del_undo(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + return out_obj_ref_add(env, arg->object, th); +} + +int out_ref_del_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_ref_del(env, dt_obj, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_ref_del_exec, out_tx_ref_del_undo, file, + line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + return 0; +} + +static int out_obj_index_insert(const struct lu_env *env, + struct dt_object *dt_obj, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle *th) +{ + int rc; + + CDEBUG(D_INFO, "%s: index insert "DFID" name: %s fid "DFID", type %u\n", + dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)), + (char *)key, PFID(((struct dt_insert_rec *)rec)->rec_fid), + ((struct dt_insert_rec *)rec)->rec_type); + + if (dt_try_as_dir(env, dt_obj) == 0) + return -ENOTDIR; + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_insert(env, dt_obj, rec, key, th, 0); + dt_write_unlock(env, dt_obj); + + return rc; +} + +static int out_obj_index_delete(const struct lu_env *env, + struct dt_object *dt_obj, + const struct dt_key *key, + struct thandle *th) +{ + int rc; + + CDEBUG(D_INFO, "%s: index delete "DFID" name: %s\n", + dt_obd_name(th->th_dev), PFID(lu_object_fid(&dt_obj->do_lu)), + (char *)key); + + if (dt_try_as_dir(env, dt_obj) == 0) + return -ENOTDIR; + + dt_write_lock(env, dt_obj, MOR_TGT_CHILD); + rc = dt_delete(env, dt_obj, key, th); + dt_write_unlock(env, dt_obj); + + return rc; +} + +static int out_tx_index_insert_exec(const struct lu_env *env, + struct thandle *th, struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + if (unlikely(!dt_object_exists(dt_obj))) + RETURN(-ESTALE); + + rc = out_obj_index_insert(env, dt_obj, + (const struct dt_rec *)&arg->u.insert.rec, + arg->u.insert.key, th); + + CDEBUG(D_INFO, "%s: insert idx insert reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + return rc; +} + +static int out_tx_index_insert_undo(const struct lu_env *env, + struct thandle *th, struct tx_arg *arg) +{ + return out_obj_index_delete(env, arg->object, arg->u.insert.key, th); +} + +int out_index_insert_add_exec(const struct lu_env *env, + struct dt_object *dt_obj, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + if (dt_try_as_dir(env, dt_obj) == 0) { + rc = -ENOTDIR; + return rc; + } + + rc = dt_declare_insert(env, dt_obj, rec, key, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_index_insert_exec, + out_tx_index_insert_undo, file, line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + arg->u.insert.rec = *(const struct dt_insert_rec *)rec; + arg->u.insert.key = key; + + return 0; +} + +static int out_tx_index_delete_exec(const struct lu_env *env, + struct thandle *th, + struct tx_arg *arg) +{ + int rc; + + rc = out_obj_index_delete(env, arg->object, arg->u.insert.key, th); + + CDEBUG(D_INFO, "%s: delete idx insert reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + + return rc; +} + +static int out_tx_index_delete_undo(const struct lu_env *env, + struct thandle *th, + struct tx_arg *arg) +{ + CERROR("%s: Oops, can not rollback index_delete yet: rc = %d\n", + dt_obd_name(th->th_dev), -ENOTSUPP); + return -ENOTSUPP; +} + +int out_index_delete_add_exec(const struct lu_env *env, + struct dt_object *dt_obj, + const struct dt_key *key, + struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + if (dt_try_as_dir(env, dt_obj) == 0) { + rc = -ENOTDIR; + return rc; + } + + LASSERT(ta->ta_handle != NULL); + rc = dt_declare_delete(env, dt_obj, key, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_index_delete_exec, + out_tx_index_delete_undo, file, line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + arg->u.insert.key = key; + return 0; +} + +static int out_tx_destroy_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + struct dt_object *dt_obj = arg->object; + int rc; + + rc = out_obj_destroy(env, dt_obj, th); + + CDEBUG(D_INFO, "%s: insert destroy reply %p index %d: rc = %d\n", + dt_obd_name(th->th_dev), arg->reply, arg->index, rc); + + if (arg->reply != NULL) + object_update_result_insert(arg->reply, NULL, 0, arg->index, + rc); + + RETURN(rc); +} + +static int out_tx_destroy_undo(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg) +{ + CERROR("%s: not support destroy undo yet!: rc = %d\n", + dt_obd_name(th->th_dev), -ENOTSUPP); + return -ENOTSUPP; +} + +int out_destroy_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + struct thandle_exec_args *ta, struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line) +{ + struct tx_arg *arg; + int rc; + + rc = dt_declare_destroy(env, dt_obj, th); + if (rc != 0) + return rc; + + arg = tx_add_exec(ta, out_tx_destroy_exec, out_tx_destroy_undo, + file, line); + if (IS_ERR(arg)) + return PTR_ERR(arg); + + lu_object_get(&dt_obj->do_lu); + arg->object = dt_obj; + arg->reply = reply; + arg->index = index; + return 0; +} diff --git a/lustre/target/tgt_internal.h b/lustre/target/tgt_internal.h index 3b52e33..603a4c2 100644 --- a/lustre/target/tgt_internal.h +++ b/lustre/target/tgt_internal.h @@ -47,58 +47,6 @@ extern int (*tgt_lfsck_in_notify)(const struct lu_env *env, struct dt_device *key, struct lfsck_request *lr, struct thandle *th); - -struct tx_arg; -typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th, - struct tx_arg *ta); - -struct tx_arg { - tx_exec_func_t exec_fn; - tx_exec_func_t undo_fn; - struct dt_object *object; - const char *file; - struct object_update_reply *reply; - int line; - int index; - union { - struct { - struct dt_insert_rec rec; - const struct dt_key *key; - } insert; - struct { - } ref; - struct { - struct lu_attr attr; - } attr_set; - struct { - struct lu_buf buf; - const char *name; - int flags; - __u32 csum; - } xattr_set; - struct { - struct lu_attr attr; - struct dt_allocation_hint hint; - struct dt_object_format dof; - struct lu_fid fid; - } create; - struct { - struct lu_buf buf; - loff_t pos; - } write; - struct { - struct ost_body *body; - } destroy; - } u; -}; - -struct thandle_exec_args { - struct thandle *ta_handle; - int ta_argno; /* used args */ - int ta_alloc_args; /* allocated args count */ - struct tx_arg **ta_args; -}; - /** * Common data shared by tg-level handlers. This is allocated per-thread to * reduce stack consumption. @@ -170,44 +118,119 @@ static inline char *dt_obd_name(struct dt_device *dt) return dt->dd_lu_dev.ld_obd->obd_name; } +/* out_lib.c */ +int out_tx_create_exec(const struct lu_env *env, struct thandle *th, + struct tx_arg *arg); +struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, + tx_exec_func_t func, tx_exec_func_t undo, + const char *file, int line); + +int out_create_add_exec(const struct lu_env *env, struct dt_object *obj, + struct lu_attr *attr, struct lu_fid *parent_fid, + struct dt_object_format *dof, + struct thandle_exec_args *ta, struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line); + +int out_attr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + const struct lu_attr *attr, + struct thandle_exec_args *ta, struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line); + +int out_write_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + const struct lu_buf *buf, loff_t pos, + struct thandle_exec_args *ta, struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line); + +int out_xattr_set_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + const struct lu_buf *buf, const char *name, + int flags, struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line); + +int out_xattr_del_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + const char *name, struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line); + +int out_ref_add_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + struct thandle_exec_args *ta, struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line); + +int out_ref_del_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + struct thandle_exec_args *ta, struct thandle *th, + struct object_update_reply *reply, int index, + const char *file, int line); + +int out_index_insert_add_exec(const struct lu_env *env, + struct dt_object *dt_obj, + const struct dt_rec *rec, + const struct dt_key *key, + struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line); + +int out_index_delete_add_exec(const struct lu_env *env, + struct dt_object *dt_obj, + const struct dt_key *key, + struct thandle_exec_args *ta, + struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line); + +int out_destroy_add_exec(const struct lu_env *env, struct dt_object *dt_obj, + struct thandle_exec_args *ta, struct thandle *th, + struct object_update_reply *reply, + int index, const char *file, int line); + /* Update handlers */ int out_handle(struct tgt_session_info *tsi); -#define out_tx_create(info, obj, attr, fid, dof, th, reply, idx) \ - __out_tx_create(info, obj, attr, fid, dof, th, reply, idx, \ - __FILE__, __LINE__) +#define out_tx_create(env, obj, attr, fid, dof, ta, th, reply, idx) \ + out_create_add_exec(env, obj, attr, fid, dof, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_attr_set(info, obj, attr, th, reply, idx) \ - __out_tx_attr_set(info, obj, attr, th, reply, idx, \ - __FILE__, __LINE__) +#define out_tx_attr_set(env, obj, attr, ta, th, reply, idx) \ + out_attr_set_add_exec(env, obj, attr, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx) \ - __out_tx_xattr_set(info, obj, buf, name, fl, th, reply, idx, \ - __FILE__, __LINE__) +#define out_tx_xattr_set(env, obj, buf, name, fl, ta, th, reply, idx) \ + out_xattr_set_add_exec(env, obj, buf, name, fl, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_xattr_del(info, obj, name, th, reply, idx) \ - __out_tx_xattr_del(info, obj, name, th, reply, idx, \ - __FILE__, __LINE__) +#define out_tx_xattr_del(env, obj, name, ta, th, reply, idx) \ + out_xattr_del_add_exec(env, obj, name, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_ref_add(info, obj, th, reply, idx) \ - __out_tx_ref_add(info, obj, th, reply, idx, __FILE__, __LINE__) +#define out_tx_ref_add(env, obj, ta, th, reply, idx) \ + out_ref_add_add_exec(env, obj, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_ref_del(info, obj, th, reply, idx) \ - __out_tx_ref_del(info, obj, th, reply, idx, __FILE__, __LINE__) +#define out_tx_ref_del(env, obj, ta, th, reply, idx) \ + out_ref_del_add_exec(env, obj, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_index_insert(info, obj, rec, key, th, reply, idx) \ - __out_tx_index_insert(info, obj, rec, key, th, reply, idx, \ - __FILE__, __LINE__) +#define out_tx_index_insert(env, obj, rec, key, ta, th, reply, idx) \ + out_index_insert_add_exec(env, obj, rec, key, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_index_delete(info, obj, key, th, reply, idx) \ - __out_tx_index_delete(info, obj, key, th, reply, idx, \ - __FILE__, __LINE__) +#define out_tx_index_delete(env, obj, key, ta, th, reply, idx) \ + out_index_delete_add_exec(env, obj, key, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_destroy(info, obj, th, reply, idx) \ - __out_tx_destroy(info, obj, th, reply, idx, __FILE__, __LINE__) +#define out_tx_destroy(env, obj, ta, th, reply, idx) \ + out_destroy_add_exec(env, obj, ta, th, reply, idx, \ + __FILE__, __LINE__) -#define out_tx_write(info, obj, buf, pos, th, reply, idx) \ - __out_tx_write(info, obj, buf, pos, th, reply, idx, __FILE__, __LINE__) +#define out_tx_write(env, obj, buf, pos, ta, th, reply, idx) \ + out_write_add_exec(env, obj, buf, pos, ta, th, reply, idx,\ + __FILE__, __LINE__) const char *update_op_str(__u16 opcode); @@ -227,13 +250,15 @@ void update_records_dump(const struct update_records *records, unsigned int mask, bool dump_updates); int check_and_prepare_update_record(const struct lu_env *env, struct thandle_update_records *tur); - struct update_thread_info { struct lu_attr uti_attr; struct lu_fid uti_fid; struct lu_buf uti_buf; struct thandle_update_records uti_tur; struct obdo uti_obdo; + struct thandle_exec_args uti_tea; + struct dt_insert_rec uti_rec; + struct distribute_txn_replay_req *uti_dtrq; }; extern struct lu_context_key update_thread_key; @@ -250,4 +275,12 @@ update_env_info(const struct lu_env *env) void update_info_init(void); void update_info_fini(void); +struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev); +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st); +void distribute_txn_insert_by_batchid(struct top_multiple_thandle *new); +int top_trans_create_tmt(const struct lu_env *env, + struct top_thandle *top_th); #endif /* _TG_INTERNAL_H */ diff --git a/lustre/target/update_records.c b/lustre/target/update_records.c index 632e071..a460e3d 100644 --- a/lustre/target/update_records.c +++ b/lustre/target/update_records.c @@ -69,7 +69,9 @@ void update_records_dump(const struct update_records *records, ops = &records->ur_ops; params = update_records_get_params(records); - CDEBUG(mask, "ops = %d params_count = %d\n", records->ur_update_count, + CDEBUG(mask, "master transno = "LPU64" batchid = "LPU64" flags = %x" + " ops = %d params = %d\n", records->ur_master_transno, + records->ur_batchid, records->ur_flags, records->ur_update_count, records->ur_param_count); if (records->ur_update_count == 0) @@ -904,6 +906,17 @@ static void update_key_fini(const struct lu_context *ctx, struct lu_context_key *key, void *data) { struct update_thread_info *info = data; + struct thandle_exec_args *args = &info->uti_tea; + int i; + + for (i = 0; i < args->ta_alloc_args; i++) { + if (args->ta_args[i] != NULL) + OBD_FREE_PTR(args->ta_args[i]); + } + + if (args->ta_args != NULL) + OBD_FREE(args->ta_args, sizeof(args->ta_args[0]) * + args->ta_alloc_args); if (info->uti_tur.tur_update_records != NULL) OBD_FREE_LARGE(info->uti_tur.tur_update_records, diff --git a/lustre/target/update_recovery.c b/lustre/target/update_recovery.c new file mode 100644 index 0000000..1b5ebca --- /dev/null +++ b/lustre/target/update_recovery.c @@ -0,0 +1,1144 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright (c) 2014, Intel Corporation. + */ + +/* + * lustre/target/update_recovery.c + * + * This file implement the methods to handle the update recovery. + * + * During DNE recovery, the recovery thread will redo the operation according + * to the transaction no, and these replay are either from client replay req + * or update replay records(for distribute transaction) in the update log. + * For distribute transaction replay, the replay thread will call + * distribute_txn_replay_handle() to handle the updates. + * + * After the Master MDT restarts, it will retrieve the update records from all + * of MDTs, for each distributed operation, it will check updates on all MDTs, + * if some updates records are missing on some MDTs, the replay thread will redo + * updates on these MDTs. + * + * Author: Di Wang + */ +#define DEBUG_SUBSYSTEM S_CLASS + +#include +#include +#include +#include +#include +#include "tgt_internal.h" + +/** + * Lookup distribute_txn_replay req + * + * Lookup distribute_txn_replay in the replay list by batchid. + * It is assumed the list has been locked before calling this function. + * + * \param[in] tdtd distribute_txn_data, which holds the replay + * list. + * \param[in] batchid batchid used by lookup. + * + * \retval pointer of the replay if succeeds. + * \retval NULL if can not find it. + */ +static struct distribute_txn_replay_req * +dtrq_lookup(struct target_distribute_txn_data *tdtd, __u64 batchid) +{ + struct distribute_txn_replay_req *tmp; + struct distribute_txn_replay_req *dtrq = NULL; + + list_for_each_entry(tmp, &tdtd->tdtd_replay_list, dtrq_list) { + if (tmp->dtrq_lur->lur_update_rec.ur_batchid == batchid) { + dtrq = tmp; + break; + } + } + return dtrq; +} + +/** + * insert distribute txn replay req + * + * Insert distribute txn replay to the replay list, and it assumes the + * list has been looked. Note: the replay list is a sorted list, which + * is sorted by master transno. It is assumed the replay list has been + * locked before calling this function. + * + * \param[in] tdtd target distribute txn data where replay list is + * \param[in] new distribute txn replay to be inserted + * + * \retval 0 if insertion succeeds + * \retval EEXIST if the dtrq already exists + */ +static int dtrq_insert(struct target_distribute_txn_data *tdtd, + struct distribute_txn_replay_req *new) +{ + struct distribute_txn_replay_req *iter; + + list_for_each_entry_reverse(iter, &tdtd->tdtd_replay_list, dtrq_list) { + if (iter->dtrq_lur->lur_update_rec.ur_master_transno > + new->dtrq_lur->lur_update_rec.ur_master_transno) + continue; + + /* If there are mulitple replay req with same transno, then + * sort them with batchid */ + if (iter->dtrq_lur->lur_update_rec.ur_master_transno == + new->dtrq_lur->lur_update_rec.ur_master_transno && + iter->dtrq_lur->lur_update_rec.ur_batchid == + new->dtrq_lur->lur_update_rec.ur_batchid) + return -EEXIST; + + /* If there are mulitple replay req with same transno, then + * sort them with batchid */ + if (iter->dtrq_lur->lur_update_rec.ur_master_transno == + new->dtrq_lur->lur_update_rec.ur_master_transno && + iter->dtrq_lur->lur_update_rec.ur_batchid > + new->dtrq_lur->lur_update_rec.ur_batchid) + continue; + + list_add(&new->dtrq_list, &iter->dtrq_list); + break; + } + + if (list_empty(&new->dtrq_list)) + list_add(&new->dtrq_list, &tdtd->tdtd_replay_list); + + return 0; +} + +/** + * create distribute txn replay req + * + * Allocate distribute txn replay req according to the update records. + * + * \param[in] tdtd target distribute txn data where replay list is. + * \param[in] record update records from the update log. + * + * \retval the pointer of distribute txn replay req if + * the creation succeeds. + * \retval NULL if the creation fails. + */ +static struct distribute_txn_replay_req * +dtrq_create(struct llog_update_record *lur) +{ + struct distribute_txn_replay_req *new; + + OBD_ALLOC_PTR(new); + if (new == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + new->dtrq_lur_size = llog_update_record_size(lur); + OBD_ALLOC_LARGE(new->dtrq_lur, new->dtrq_lur_size); + if (new->dtrq_lur == NULL) { + OBD_FREE_PTR(new); + RETURN(ERR_PTR(-ENOMEM)); + } + + memcpy(new->dtrq_lur, lur, new->dtrq_lur_size); + + spin_lock_init(&new->dtrq_sub_list_lock); + INIT_LIST_HEAD(&new->dtrq_sub_list); + INIT_LIST_HEAD(&new->dtrq_list); + + RETURN(new); +} + +/** + * Lookup distribute sub replay + * + * Lookup distribute sub replay in the sub list of distribute_txn_replay by + * mdt_index. + * + * \param[in] distribute_txn_replay_req the distribute txn replay req to lookup + * \param[in] mdt_index the mdt_index as the key of lookup + * + * \retval the pointer of sub replay if it can be found. + * \retval NULL if it can not find. + */ +struct distribute_txn_replay_req_sub * +dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index) +{ + struct distribute_txn_replay_req_sub *dtrqs = NULL; + struct distribute_txn_replay_req_sub *tmp; + + list_for_each_entry(tmp, &dtrq->dtrq_sub_list, dtrqs_list) { + if (tmp->dtrqs_mdt_index == mdt_index) { + dtrqs = tmp; + break; + } + } + return dtrqs; +} + +/** + * Insert distribute txn sub req replay + * + * Allocate sub replay req and insert distribute txn replay list. + * + * \param[in] dtrq d to be added + * \param[in] cookie the cookie of the update record + * \param[in] mdt_index the mdt_index of the update record + * + * \retval 0 if the adding succeeds. + * \retval negative errno if the adding fails. + */ +static int +dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq, + struct llog_cookie *cookie, + __u32 mdt_index) +{ + struct distribute_txn_replay_req_sub *dtrqs = NULL; + struct distribute_txn_replay_req_sub *new; + ENTRY; + + spin_lock(&dtrq->dtrq_sub_list_lock); + dtrqs = dtrq_sub_lookup(dtrq, mdt_index); + spin_unlock(&dtrq->dtrq_sub_list_lock); + if (dtrqs != NULL) + RETURN(0); + + OBD_ALLOC_PTR(new); + if (new == NULL) + RETURN(-ENOMEM); + + INIT_LIST_HEAD(&new->dtrqs_list); + new->dtrqs_mdt_index = mdt_index; + new->dtrqs_llog_cookie = *cookie; + spin_lock(&dtrq->dtrq_sub_list_lock); + dtrqs = dtrq_sub_lookup(dtrq, mdt_index); + if (dtrqs == NULL) + list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list); + else + OBD_FREE_PTR(new); + spin_unlock(&dtrq->dtrq_sub_list_lock); + + RETURN(0); +} + +/** + * Insert update records to the replay list. + * + * Allocate distribute txn replay req and insert it into the replay + * list, then insert the update records into the replay req. + * + * \param[in] tdtd distribute txn replay data where the replay list + * is. + * \param[in] record the update record + * \param[in] cookie cookie of the record + * \param[in] index mdt index of the record + * + * \retval 0 if the adding succeeds. + * \retval negative errno if the adding fails. + */ +int +insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd, + struct llog_update_record *lur, + struct llog_cookie *cookie, + __u32 mdt_index) +{ + struct distribute_txn_replay_req *dtrq; + struct update_records *record = &lur->lur_update_rec; + int rc = 0; + ENTRY; + + CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64 + " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name, + record->ur_batchid, record->ur_master_transno, mdt_index); +again: + spin_lock(&tdtd->tdtd_replay_list_lock); + dtrq = dtrq_lookup(tdtd, record->ur_batchid); + spin_unlock(&tdtd->tdtd_replay_list_lock); + if (dtrq == NULL) { + /* If the transno in the update record is 0, it means the + * update are from master MDT, and we will use the master + * last committed transno as its batchid. Note: if it got + * the records from the slave later, it needs to update + * the batchid by the transno in slave update log (see below) */ + dtrq = dtrq_create(lur); + if (IS_ERR(dtrq)) + RETURN(PTR_ERR(dtrq)); + + if (record->ur_master_transno == 0) + dtrq->dtrq_lur->lur_update_rec.ur_master_transno = + tdtd->tdtd_lut->lut_last_transno; + spin_lock(&tdtd->tdtd_replay_list_lock); + rc = dtrq_insert(tdtd, dtrq); + spin_unlock(&tdtd->tdtd_replay_list_lock); + } else if (record->ur_master_transno != 0 && + dtrq->dtrq_lur->lur_update_rec.ur_master_transno != + record->ur_master_transno) { + /* If the master transno in update header is not matched with + * the one in the record, then it means the dtrq is originally + * created by master record, and we need update master transno + * and reposition the dtrq(by master transno). */ + dtrq->dtrq_lur->lur_update_rec.ur_master_transno = + record->ur_master_transno; + list_del_init(&dtrq->dtrq_list); + spin_lock(&tdtd->tdtd_replay_list_lock); + rc = dtrq_insert(tdtd, dtrq); + spin_unlock(&tdtd->tdtd_replay_list_lock); + } + + if (rc == -EEXIST) { + dtrq_destory(dtrq); + rc = 0; + goto again; + } + + rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index); + + RETURN(rc); +} +EXPORT_SYMBOL(insert_update_records_to_replay_list); + +/** + * Dump updates of distribute txns. + * + * Output all of recovery updates in the distribute txn list to the + * debug log. + * + * \param[in] tdtd distribute txn data where all of distribute txn + * are listed. + * \param[in] mask debug mask + */ +void dtrq_list_dump(struct target_distribute_txn_data *tdtd, unsigned int mask) +{ + struct distribute_txn_replay_req *dtrq; + + spin_lock(&tdtd->tdtd_replay_list_lock); + list_for_each_entry(dtrq, &tdtd->tdtd_replay_list, dtrq_list) + update_records_dump(&dtrq->dtrq_lur->lur_update_rec, mask, + false); + spin_unlock(&tdtd->tdtd_replay_list_lock); +} +EXPORT_SYMBOL(dtrq_list_dump); + +/** + * Destroy distribute txn replay req + * + * Destroy distribute txn replay req and all of subs. + * + * \param[in] dtrq distribute txn replqy req to be destroyed. + */ +void dtrq_destory(struct distribute_txn_replay_req *dtrq) +{ + struct distribute_txn_replay_req_sub *dtrqs; + struct distribute_txn_replay_req_sub *tmp; + + LASSERT(list_empty(&dtrq->dtrq_list)); + spin_lock(&dtrq->dtrq_sub_list_lock); + list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) { + list_del(&dtrqs->dtrqs_list); + OBD_FREE_PTR(dtrqs); + } + spin_unlock(&dtrq->dtrq_sub_list_lock); + + if (dtrq->dtrq_lur != NULL) + OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size); + + OBD_FREE_PTR(dtrq); +} +EXPORT_SYMBOL(dtrq_destory); + +/** + * Destroy all of replay req. + * + * Destroy all of replay req in the replay list. + * + * \param[in] tdtd target distribute txn data where the replay list is. + */ +void dtrq_list_destroy(struct target_distribute_txn_data *tdtd) +{ + struct distribute_txn_replay_req *dtrq; + struct distribute_txn_replay_req *tmp; + + spin_lock(&tdtd->tdtd_replay_list_lock); + list_for_each_entry_safe(dtrq, tmp, &tdtd->tdtd_replay_list, + dtrq_list) { + list_del_init(&dtrq->dtrq_list); + dtrq_destory(dtrq); + } + spin_unlock(&tdtd->tdtd_replay_list_lock); +} +EXPORT_SYMBOL(dtrq_list_destroy); + +/** + * Get next req in the replay list + * + * Get next req needs to be replayed, since it is a sorted list + * (by master MDT transno) + * + * \param[in] tdtd distribute txn data where the replay list is + * + * \retval the pointer of update recovery header + */ +struct distribute_txn_replay_req * +distribute_txn_get_next_req(struct target_distribute_txn_data *tdtd) +{ + struct distribute_txn_replay_req *dtrq = NULL; + + spin_lock(&tdtd->tdtd_replay_list_lock); + if (!list_empty(&tdtd->tdtd_replay_list)) { + dtrq = list_entry(tdtd->tdtd_replay_list.next, + struct distribute_txn_replay_req, dtrq_list); + list_del_init(&dtrq->dtrq_list); + } + spin_unlock(&tdtd->tdtd_replay_list_lock); + + return dtrq; +} +EXPORT_SYMBOL(distribute_txn_get_next_req); + +/** + * Get next transno in the replay list, because this is the sorted + * list, so it will return the transno of next req in the list. + * + * \param[in] tdtd distribute txn data where the replay list is + * + * \retval the transno of next update in the list + */ +__u64 distribute_txn_get_next_transno(struct target_distribute_txn_data *tdtd) +{ + struct distribute_txn_replay_req *dtrq = NULL; + __u64 transno = 0; + + spin_lock(&tdtd->tdtd_replay_list_lock); + if (!list_empty(&tdtd->tdtd_replay_list)) { + dtrq = list_entry(tdtd->tdtd_replay_list.next, + struct distribute_txn_replay_req, dtrq_list); + transno = dtrq->dtrq_lur->lur_update_rec.ur_master_transno; + } + spin_unlock(&tdtd->tdtd_replay_list_lock); + + CDEBUG(D_HA, "%s: Next update transno "LPU64"\n", + tdtd->tdtd_lut->lut_obd->obd_name, transno); + return transno; +} +EXPORT_SYMBOL(distribute_txn_get_next_transno); + +/** + * Check if the update of one object is committed + * + * Check whether the update for the object is committed by checking whether + * the correspondent sub exists in the replay req. If it is committed, mark + * the committed flag in correspondent the sub thandle. + * + * \param[in] env execution environment + * \param[in] dtrq replay request + * \param[in] dt_obj object for the update + * \param[in] top_th top thandle + * \param[in] sub_th sub thandle which the update belongs to + * + * \retval 1 if the update is not committed. + * \retval 0 if the update is committed. + * \retval negative errno if some other failures happen. + */ +static int update_is_committed(const struct lu_env *env, + struct distribute_txn_replay_req *dtrq, + struct dt_object *dt_obj, + struct top_thandle *top_th, + struct sub_thandle *st) +{ + struct seq_server_site *seq_site; + const struct lu_fid *fid = lu_object_fid(&dt_obj->do_lu); + struct distribute_txn_replay_req_sub *dtrqs; + __u32 mdt_index; + ENTRY; + + if (st->st_sub_th != NULL) + RETURN(1); + + if (st->st_committed) + RETURN(0); + + seq_site = lu_site2seq(dt_obj->do_lu.lo_dev->ld_site); + if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) { + mdt_index = fid_oid(fid); + } else if (!fid_seq_in_fldb(fid_seq(fid))) { + mdt_index = seq_site->ss_node_id; + } else { + struct lu_server_fld *fld; + struct lu_seq_range range = {0}; + int rc; + + fld = seq_site->ss_server_fld; + fld_range_set_type(&range, LU_SEQ_RANGE_MDT); + LASSERT(fld->lsf_seq_lookup != NULL); + rc = fld->lsf_seq_lookup(env, fld, fid_seq(fid), + &range); + if (rc < 0) + RETURN(rc); + mdt_index = range.lsr_index; + } + + dtrqs = dtrq_sub_lookup(dtrq, mdt_index); + if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) { + st->st_committed = 1; + if (dtrqs != NULL) + st->st_cookie = dtrqs->dtrqs_llog_cookie; + RETURN(0); + } + + CDEBUG(D_HA, "Update of "DFID "on MDT%u is not committed\n", PFID(fid), + mdt_index); + + RETURN(1); +} + +/** + * Implementation of different update methods for update recovery. + * + * These following functions update_recovery_$(update_name) implement + * different updates recovery methods. They will extract the parameters + * from the common parameters area and call correspondent dt API to redo + * the update. + * + * \param[in] env execution environment + * \param[in] op update operation to be replayed + * \param[in] params common update parameters which holds all parameters + * of the operation + * \param[in] th transaction handle + * \param[in] declare indicate it will do declare or real execution, true + * means declare, false means real execution + * + * \retval 0 if it succeeds. + * \retval negative errno if it fails. + */ +static int update_recovery_create(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + struct update_thread_info *uti = update_env_info(env); + struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur; + struct lu_attr *attr = &uti->uti_attr; + struct obdo *wobdo; + struct obdo *lobdo = &uti->uti_obdo; + struct dt_object_format dof; + __u16 size; + unsigned int param_count; + int rc; + ENTRY; + + if (dt_object_exists(dt_obj)) + RETURN(-EEXIST); + + param_count = lur->lur_update_rec.ur_param_count; + wobdo = update_params_get_param_buf(params, op->uop_params_off[0], + param_count, &size); + if (wobdo == NULL) + RETURN(-EIO); + if (size != sizeof(*wobdo)) + RETURN(-EIO); + + if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr)) + lustre_swab_obdo(wobdo); + + lustre_get_wire_obdo(NULL, lobdo, wobdo); + la_from_obdo(attr, lobdo, lobdo->o_valid); + + dof.dof_type = dt_mode_to_dft(attr->la_mode); + + rc = out_tx_create(env, dt_obj, attr, NULL, &dof, + ta, th, NULL, 0); + + RETURN(rc); +} + +static int update_recovery_destroy(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + int rc; + ENTRY; + + rc = out_tx_destroy(env, dt_obj, ta, th, NULL, 0); + + RETURN(rc); +} + +static int update_recovery_ref_add(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + int rc; + ENTRY; + + rc = out_tx_ref_add(env, dt_obj, ta, th, NULL, 0); + + RETURN(rc); +} + +static int update_recovery_ref_del(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + int rc; + ENTRY; + + rc = out_tx_ref_del(env, dt_obj, ta, th, NULL, 0); + + RETURN(rc); +} + +static int update_recovery_attr_set(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + struct update_thread_info *uti = update_env_info(env); + struct llog_update_record *lur = uti->uti_dtrq->dtrq_lur; + struct obdo *wobdo; + struct obdo *lobdo = &uti->uti_obdo; + struct lu_attr *attr = &uti->uti_attr; + __u16 size; + unsigned int param_count; + int rc; + ENTRY; + + param_count = lur->lur_update_rec.ur_param_count; + wobdo = update_params_get_param_buf(params, op->uop_params_off[0], + param_count, &size); + if (wobdo == NULL) + RETURN(-EIO); + if (size != sizeof(*wobdo)) + RETURN(-EIO); + + if (LLOG_REC_HDR_NEEDS_SWABBING(&lur->lur_hdr)) + lustre_swab_obdo(wobdo); + + lustre_get_wire_obdo(NULL, lobdo, wobdo); + la_from_obdo(attr, lobdo, lobdo->o_valid); + + rc = out_tx_attr_set(env, dt_obj, attr, ta, th, NULL, 0); + + RETURN(rc); +} + +static int update_recovery_xattr_set(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + struct update_thread_info *uti = update_env_info(env); + char *buf; + char *name; + int fl; + __u16 size; + __u32 param_count; + int rc; + ENTRY; + + param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count; + name = update_params_get_param_buf(params, + op->uop_params_off[0], + param_count, &size); + if (name == NULL) + RETURN(-EIO); + + buf = update_params_get_param_buf(params, + op->uop_params_off[1], + param_count, &size); + if (buf == NULL) + RETURN(-EIO); + + uti->uti_buf.lb_buf = buf; + uti->uti_buf.lb_len = (size_t)size; + + buf = update_params_get_param_buf(params, op->uop_params_off[2], + param_count, &size); + if (buf == NULL) + RETURN(-EIO); + if (size != sizeof(fl)) + RETURN(-EIO); + + fl = le32_to_cpu(*(int *)buf); + + rc = out_tx_xattr_set(env, dt_obj, &uti->uti_buf, name, fl, ta, th, + NULL, 0); + + RETURN(rc); +} + +static int update_recovery_index_insert(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + struct update_thread_info *uti = update_env_info(env); + struct lu_fid *fid; + char *name; + __u32 param_count; + __u32 *ptype; + __u32 type; + __u16 size; + int rc; + ENTRY; + + param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count; + name = update_params_get_param_buf(params, op->uop_params_off[0], + param_count, &size); + if (name == NULL) + RETURN(-EIO); + + fid = update_params_get_param_buf(params, op->uop_params_off[1], + param_count, &size); + if (fid == NULL) + RETURN(-EIO); + if (size != sizeof(*fid)) + RETURN(-EIO); + + fid_le_to_cpu(fid, fid); + + ptype = update_params_get_param_buf(params, op->uop_params_off[2], + param_count, &size); + if (ptype == NULL) + RETURN(-EIO); + if (size != sizeof(*ptype)) + RETURN(-EIO); + type = le32_to_cpu(*ptype); + + if (dt_try_as_dir(env, dt_obj) == 0) + RETURN(-ENOTDIR); + + uti->uti_rec.rec_fid = fid; + uti->uti_rec.rec_type = type; + + rc = out_tx_index_insert(env, dt_obj, + (const struct dt_rec *)&uti->uti_rec, + (const struct dt_key *)name, ta, th, + NULL, 0); + + RETURN(rc); +} + +static int update_recovery_index_delete(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + struct update_thread_info *uti = update_env_info(env); + __u32 param_count; + char *name; + __u16 size; + int rc; + ENTRY; + + param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count; + name = update_params_get_param_buf(params, op->uop_params_off[0], + param_count, &size); + if (name == NULL) + RETURN(-EIO); + + if (dt_try_as_dir(env, dt_obj) == 0) + RETURN(-ENOTDIR); + + rc = out_tx_index_delete(env, dt_obj, + (const struct dt_key *)name, ta, th, NULL, 0); + + RETURN(rc); +} + +static int update_recovery_write(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + struct update_thread_info *uti = update_env_info(env); + char *buf; + __u32 param_count; + __u64 pos; + __u16 size; + int rc; + ENTRY; + + param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count; + buf = update_params_get_param_buf(params, op->uop_params_off[0], + param_count, &size); + if (buf == NULL) + RETURN(-EIO); + + uti->uti_buf.lb_buf = buf; + uti->uti_buf.lb_len = size; + + buf = update_params_get_param_buf(params, op->uop_params_off[1], + param_count, &size); + if (buf == NULL) + RETURN(-EIO); + + pos = le64_to_cpu(*(__u64 *)buf); + + rc = out_tx_write(env, dt_obj, &uti->uti_buf, pos, + ta, th, NULL, 0); + + RETURN(rc); +} + +static int update_recovery_xattr_del(const struct lu_env *env, + struct dt_object *dt_obj, + const struct update_op *op, + const struct update_params *params, + struct thandle_exec_args *ta, + struct thandle *th) +{ + struct update_thread_info *uti = update_env_info(env); + __u32 param_count; + char *name; + __u16 size; + int rc; + ENTRY; + + param_count = uti->uti_dtrq->dtrq_lur->lur_update_rec.ur_param_count; + name = update_params_get_param_buf(params, op->uop_params_off[0], + param_count, &size); + if (name == NULL) + RETURN(-EIO); + + rc = out_tx_xattr_del(env, dt_obj, name, ta, th, NULL, 0); + + RETURN(rc); +} + +/** + * Execute updates in the update replay records + * + * Declare distribute txn replay by update records and add the updates + * to the execution list. Note: it will check if the update has been + * committed, and only execute the updates if it is not committed to + * disk. + * + * \param[in] env execution environment + * \param[in] tdtd distribute txn replay data which hold all of replay + * reqs and all replay parameters. + * \param[in] dtrq distribute transaction replay req. + * \param[in] ta thandle execute args. + * + * \retval 0 if declare succeeds. + * \retval negative errno if declare fails. + */ +static int update_recovery_exec(const struct lu_env *env, + struct target_distribute_txn_data *tdtd, + struct distribute_txn_replay_req *dtrq, + struct thandle_exec_args *ta) +{ + struct llog_update_record *lur = dtrq->dtrq_lur; + struct update_records *records = &lur->lur_update_rec; + struct update_ops *ops = &records->ur_ops; + struct update_params *params = update_records_get_params(records); + struct top_thandle *top_th = container_of(ta->ta_handle, + struct top_thandle, + tt_super); + struct top_multiple_thandle *tmt = top_th->tt_multiple_thandle; + struct update_op *op; + unsigned int i; + int rc = 0; + ENTRY; + + /* These records have been swabbed in llog_cat_process() */ + for (i = 0, op = &ops->uops_op[0]; i < records->ur_update_count; + i++, op = update_op_next_op(op)) { + struct lu_fid *fid = &op->uop_fid; + struct dt_object *dt_obj; + struct dt_object *sub_dt_obj; + struct dt_device *sub_dt; + struct sub_thandle *st; + + dt_obj = dt_locate(env, tdtd->tdtd_dt, fid); + if (IS_ERR(dt_obj)) { + rc = PTR_ERR(dt_obj); + break; + } + sub_dt_obj = dt_object_child(dt_obj); + + /* Create sub thandle if not */ + sub_dt = lu2dt_dev(sub_dt_obj->do_lu.lo_dev); + st = lookup_sub_thandle(tmt, sub_dt); + if (st == NULL) { + st = create_sub_thandle(tmt, sub_dt); + if (IS_ERR(st)) + GOTO(next, rc = PTR_ERR(st)); + } + + /* check if updates on the OSD/OSP are committed */ + rc = update_is_committed(env, dtrq, dt_obj, top_th, st); + if (rc == 0) + /* If this is committed, goto next */ + goto next; + + if (rc < 0) + GOTO(next, rc); + + /* Create thandle for sub thandle if needed */ + if (st->st_sub_th == NULL) { + rc = sub_thandle_trans_create(env, top_th, st); + if (rc != 0) + GOTO(next, rc); + } + + CDEBUG(D_HA, "replay %uth update\n", i); + switch (op->uop_type) { + case OUT_CREATE: + rc = update_recovery_create(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_DESTROY: + rc = update_recovery_destroy(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_REF_ADD: + rc = update_recovery_ref_add(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_REF_DEL: + rc = update_recovery_ref_del(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_ATTR_SET: + rc = update_recovery_attr_set(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_XATTR_SET: + rc = update_recovery_xattr_set(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_INDEX_INSERT: + rc = update_recovery_index_insert(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_INDEX_DELETE: + rc = update_recovery_index_delete(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_WRITE: + rc = update_recovery_write(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + case OUT_XATTR_DEL: + rc = update_recovery_xattr_del(env, sub_dt_obj, + op, params, ta, + st->st_sub_th); + break; + default: + CERROR("Unknown update type %u\n", (__u32)op->uop_type); + rc = -EINVAL; + break; + } +next: + lu_object_put(env, &dt_obj->do_lu); + if (rc < 0) + break; + } + + ta->ta_handle->th_result = rc; + RETURN(rc); +} + +/** + * redo updates on MDT if needed. + * + * During DNE recovery, the recovery thread (target_recovery_thread) will call + * this function to replay distribute txn updates on all MDTs. It only replay + * updates on the MDT where the update record is missing. + * + * If the update already exists on the MDT, then it does not need replay the + * updates on that MDT, and only mark the sub transaction has been committed + * there. + * + * \param[in] env execution environment + * \param[in] tdtd target distribute txn data, which holds the replay list + * and all parameters needed by replay process. + * \param[in] dtrq distribute txn replay req. + * + * \retval 0 if replay succeeds. + * \retval negative errno if replay failes. + */ +int distribute_txn_replay_handle(struct lu_env *env, + struct target_distribute_txn_data *tdtd, + struct distribute_txn_replay_req *dtrq) +{ + struct update_records *records = &dtrq->dtrq_lur->lur_update_rec; + struct thandle_exec_args *ta; + struct lu_context session_env; + struct thandle *th = NULL; + struct top_thandle *top_th; + struct top_multiple_thandle *tmt; + struct thandle_update_records *tur = NULL; + unsigned int i; + int rc = 0; + ENTRY; + + /* initialize session, it is needed for the handler of target */ + rc = lu_context_init(&session_env, LCT_SERVER_SESSION | LCT_NOREF); + if (rc) { + CERROR("%s: failure to initialize session: rc = %d\n", + tdtd->tdtd_lut->lut_obd->obd_name, rc); + RETURN(rc); + } + lu_context_enter(&session_env); + env->le_ses = &session_env; + lu_env_refill(env); + update_records_dump(records, D_HA, true); + th = top_trans_create(env, NULL); + if (IS_ERR(th)) + GOTO(exit_session, rc = PTR_ERR(th)); + + ta = &update_env_info(env)->uti_tea; + ta->ta_argno = 0; + + update_env_info(env)->uti_dtrq = dtrq; + /* Create distribute transaction structure for this top thandle */ + top_th = container_of(th, struct top_thandle, tt_super); + rc = top_trans_create_tmt(env, top_th); + if (rc < 0) + GOTO(stop_trans, rc); + + ta->ta_handle = th; + + /* check if the distribute transaction has been committed */ + tmt = top_th->tt_multiple_thandle; + tmt->tmt_master_sub_dt = tdtd->tdtd_lut->lut_bottom; + tmt->tmt_batchid = records->ur_batchid; + tgt_th_info(env)->tti_transno = records->ur_master_transno; + + if (tmt->tmt_batchid <= tdtd->tdtd_committed_batchid) + tmt->tmt_committed = 1; + + rc = update_recovery_exec(env, tdtd, dtrq, ta); + if (rc < 0) + GOTO(stop_trans, rc); + + /* If no updates are needed to be replayed, then + * mark this records as committed, so commit thread + * distribute_txn_commit_thread() will delete the + * record */ + if (ta->ta_argno == 0) + tmt->tmt_committed = 1; + + tur = &update_env_info(env)->uti_tur; + tur->tur_update_records = dtrq->dtrq_lur; + tur->tur_update_records_buf_size = dtrq->dtrq_lur_size; + tur->tur_update_params = NULL; + tur->tur_update_param_count = 0; + tmt->tmt_update_records = tur; + + distribute_txn_insert_by_batchid(tmt); + rc = top_trans_start(env, NULL, th); + if (rc < 0) + GOTO(stop_trans, rc); + + for (i = 0; i < ta->ta_argno; i++) { + struct tx_arg *ta_arg; + struct dt_object *dt_obj; + struct dt_device *sub_dt; + struct sub_thandle *st; + + ta_arg = ta->ta_args[i]; + dt_obj = ta_arg->object; + + LASSERT(tmt->tmt_committed == 0); + sub_dt = lu2dt_dev(dt_obj->do_lu.lo_dev); + st = lookup_sub_thandle(tmt, sub_dt); + LASSERT(st != NULL); + LASSERT(st->st_sub_th != NULL); + rc = ta->ta_args[i]->exec_fn(env, st->st_sub_th, + ta->ta_args[i]); + if (unlikely(rc < 0)) { + CDEBUG(D_HA, "error during execution of #%u from" + " %s:%d: rc = %d\n", i, ta->ta_args[i]->file, + ta->ta_args[i]->line, rc); + while (--i >= 0) { + if (ta->ta_args[i]->undo_fn != NULL) { + dt_obj = ta->ta_args[i]->object; + sub_dt = + lu2dt_dev(dt_obj->do_lu.lo_dev); + st = lookup_sub_thandle(tmt, sub_dt); + LASSERT(st != NULL); + LASSERT(st->st_sub_th != NULL); + + ta->ta_args[i]->undo_fn(env, + st->st_sub_th, + ta->ta_args[i]); + } else { + CERROR("%s: undo for %s:%d: rc = %d\n", + dt_obd_name(ta->ta_handle->th_dev), + ta->ta_args[i]->file, + ta->ta_args[i]->line, -ENOTSUPP); + } + } + break; + } + CDEBUG(D_HA, "%s: executed %u/%u: rc = %d\n", + dt_obd_name(sub_dt), i, ta->ta_argno, rc); + } + +stop_trans: + if (rc < 0) + th->th_result = rc; + rc = top_trans_stop(env, tdtd->tdtd_dt, th); + for (i = 0; i < ta->ta_argno; i++) { + if (ta->ta_args[i]->object != NULL) { + lu_object_put(env, &ta->ta_args[i]->object->do_lu); + ta->ta_args[i]->object = NULL; + } + } + + if (tur != NULL) + tur->tur_update_records = NULL; +exit_session: + lu_context_exit(&session_env); + lu_context_fini(&session_env); + RETURN(rc); +} +EXPORT_SYMBOL(distribute_txn_replay_handle); diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index 7ff50cf..e56c1ec 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -315,33 +315,6 @@ struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt, } /** - * Create sub thandle - * - * Create transaction handle for sub_thandle - * - * \param[in] env execution environment - * \param[in] th top thandle - * \param[in] st sub_thandle - * - * \retval 0 if creation succeeds. - * \retval negative errno if creation fails. - */ -int sub_thandle_trans_create(const struct lu_env *env, - struct top_thandle *top_th, - struct sub_thandle *st) -{ - struct thandle *sub_th; - - sub_th = dt_trans_create(env, st->st_dt); - if (IS_ERR(sub_th)) - return PTR_ERR(sub_th); - - sub_th->th_top = &top_th->tt_super; - st->st_sub_th = sub_th; - return 0; -} - -/** * sub thandle commit callback * * Mark the sub thandle to be committed and if all sub thandle are committed @@ -349,7 +322,8 @@ int sub_thandle_trans_create(const struct lu_env *env, * * \param[in] sub_th sub thandle being committed. */ -static void sub_trans_commit_cb(struct lu_env *env, struct thandle *sub_th, +static void sub_trans_commit_cb(struct lu_env *env, + struct thandle *sub_th, struct dt_txn_commit_cb *cb, int err) { struct sub_thandle *st; @@ -390,6 +364,36 @@ static void sub_thandle_register_commit_cb(struct sub_thandle *st, } /** + * Create sub thandle + * + * Create transaction handle for sub_thandle + * + * \param[in] env execution environment + * \param[in] th top thandle + * \param[in] st sub_thandle + * + * \retval 0 if creation succeeds. + * \retval negative errno if creation fails. + */ +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st) +{ + struct thandle *sub_th; + + sub_th = dt_trans_create(env, st->st_dt); + if (IS_ERR(sub_th)) + return PTR_ERR(sub_th); + + sub_th->th_top = &top_th->tt_super; + st->st_sub_th = sub_th; + + sub_th->th_wait_submit = 1; + return 0; +} + + +/** * Create the top transaction. * * Create the top transaction on the master device. It will create a top @@ -469,7 +473,6 @@ static int declare_updates_write(const struct lu_env *env, * Assign batchid to the distribute transaction * * \param[in] tmt distribute transaction - * */ static void distribute_txn_assign_batchid(struct top_multiple_thandle *new) { @@ -549,18 +552,16 @@ static int prepare_multiple_node_trans(const struct lu_env *env, int rc; ENTRY; - /* Prepare the update buffer for recording updates */ - if (tmt->tmt_update_records != NULL) - RETURN(0); - - tur = &update_env_info(env)->uti_tur; - rc = check_and_prepare_update_record(env, tur); - if (rc < 0) - RETURN(rc); + if (tmt->tmt_update_records == NULL) { + tur = &update_env_info(env)->uti_tur; + rc = check_and_prepare_update_record(env, tur); + if (rc < 0) + RETURN(rc); - tmt->tmt_update_records = tur; + tmt->tmt_update_records = tur; + distribute_txn_assign_batchid(tmt); + } - distribute_txn_assign_batchid(tmt); rc = declare_updates_write(env, tmt); RETURN(rc); @@ -588,14 +589,13 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, int rc = 0; ENTRY; - /* Walk through all of sub transaction to see if it needs to - * record updates for this transaction */ if (tmt == NULL) { rc = dt_trans_start(env, top_th->tt_master_sub_thandle->th_dev, top_th->tt_master_sub_thandle); RETURN(rc); } + tmt = top_th->tt_multiple_thandle; rc = prepare_multiple_node_trans(env, tmt); if (rc < 0) RETURN(rc); @@ -609,11 +609,12 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, rc = dt_trans_start(env, st->st_sub_th->th_dev, st->st_sub_th); if (rc != 0) - RETURN(rc); + GOTO(out, rc); sub_thandle_register_commit_cb(st, tmt); } - +out: + th->th_result = rc; RETURN(rc); } EXPORT_SYMBOL(top_trans_start); @@ -684,6 +685,7 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, ENTRY; if (likely(top_th->tt_multiple_thandle == NULL)) { + LASSERT(master_dev != NULL); rc = dt_trans_stop(env, master_dev, top_th->tt_master_sub_thandle); OBD_FREE_PTR(top_th); @@ -698,12 +700,6 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, * then these update logs will be sent to other MDTs */ /* get the master sub thandle */ master_st = lookup_sub_thandle(tmt, tmt->tmt_master_sub_dt); - if (master_st == NULL) { - top_multiple_thandle_dump(tmt, D_ERROR); - if (th->th_result == 0) - LBUG(); - } - write_updates = top_check_write_updates(top_th); /* Step 1: write the updates log on Master MDT */ @@ -722,7 +718,7 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, lur = tur->tur_update_records; /* Write updates to the master MDT */ - rc = sub_updates_write(env, lur, top_th->tt_master_sub_thandle, + rc = sub_updates_write(env, lur, master_st->st_sub_th, &master_st->st_cookie); /* Cleanup the common parameters in the update records, @@ -787,8 +783,10 @@ stop_master_trans: rc = sub_updates_write(env, lur, st->st_sub_th, &st->st_cookie); - if (rc < 0) + if (rc < 0) { + th->th_result = rc; break; + } } } @@ -830,8 +828,8 @@ EXPORT_SYMBOL(top_trans_stop); * \retval 0 if creation succeeds * \retval negative errno if creation fails */ -static int top_trans_create_tmt(const struct lu_env *env, - struct top_thandle *top_th) +int top_trans_create_tmt(const struct lu_env *env, + struct top_thandle *top_th) { struct top_multiple_thandle *tmt; @@ -1443,7 +1441,10 @@ void distribute_txn_fini(const struct lu_env *env, wait_event(lut->lut_tdtd_commit_thread.t_ctl_waitq, lut->lut_tdtd_commit_thread.t_flags & SVC_STOPPED); - if (tdtd->tdtd_batchid_obj != NULL) + dtrq_list_destroy(tdtd); + if (tdtd->tdtd_batchid_obj != NULL) { lu_object_put(env, &tdtd->tdtd_batchid_obj->do_lu); + tdtd->tdtd_batchid_obj = NULL; + } } EXPORT_SYMBOL(distribute_txn_fini); diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index 11dc37f..811d99e 100644 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -321,7 +321,7 @@ test_1() { run_test 1 "start up ost twice (should return errors)" test_2() { - start_mds || error "MDT0 start fail" + start_mds || error "MDT start failed" echo "start mds second time.." start_mds && error "2nd MDT start should fail" start_ost || error "OST start failed" diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index cae8c16..3217c8f 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -684,18 +684,17 @@ test_17o() { local WDIR=$DIR/${tdir}o local mdt_index - local mdtdevname local rc=0 mkdir -p $WDIR mdt_index=$($LFS getstripe -M $WDIR) mdt_index=$((mdt_index+1)) - mdtdevname=$(mdsdevname $mdt_index) touch $WDIR/$tfile - stop mds${mdt_index} - start mds${mdt_index} $mdtdevname $MDS_MOUNT_OPTS || - error "mount mds${mdt_index} failed" + + #fail mds will wait the failover finish then set + #following fail_loc to avoid interfer the recovery process. + fail mds${mdt_index} #define OBD_FAIL_OSD_LMA_INCOMPAT 0x194 do_facet mds${mdt_index} lctl set_param fail_loc=0x194 -- 1.8.3.1