4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * Lustre Unified Target
32 * These are common function to work with last_received file
34 * Author: Mikhail Pershin <mike.pershin@intel.com>
37 #include <obd_class.h>
38 #include <lustre_fid.h>
40 #include "tgt_internal.h"
42 /** version recovery epoch */
43 #define LR_EPOCH_BITS 32
45 /* Allocate a bitmap for a chunk of reply data slots */
46 static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
50 OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
55 spin_lock(&lut->lut_client_bitmap_lock);
57 if (lut->lut_reply_bitmap[chunk] != NULL) {
58 /* someone else already allocated the bitmap for this chunk */
59 spin_unlock(&lut->lut_client_bitmap_lock);
60 OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
65 lut->lut_reply_bitmap[chunk] = bm;
67 spin_unlock(&lut->lut_client_bitmap_lock);
72 /* Look for an available reply data slot in the bitmap
74 * Allocate bitmap chunk when first used
75 * XXX algo could be improved if this routine limits performance
77 static int tgt_find_free_reply_slot(struct lu_target *lut)
84 for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
85 /* allocate the bitmap chunk if necessary */
86 if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
87 rc = tgt_bitmap_chunk_alloc(lut, chunk);
91 bmp = lut->lut_reply_bitmap[chunk];
93 /* look for an available slot in this chunk */
95 b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
96 if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
100 if (test_and_set_bit(b, bmp) == 0)
101 return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
108 /* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
110 * Allocate the bitmap chunk if necessary
112 static int tgt_set_reply_slot(struct lu_target *lut, int idx)
118 chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
119 b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
121 LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
122 LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
124 /* allocate the bitmap chunk if necessary */
125 if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
126 rc = tgt_bitmap_chunk_alloc(lut, chunk);
131 /* mark the slot 'used' in this chunk */
132 if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
133 CERROR("%s: slot %d already set in bitmap\n",
142 /* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
145 static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
150 if (lut->lut_obd->obd_stopping)
152 * in case of failover keep the bit set in order to
153 * avoid overwriting slots in reply_data which might
154 * be required by resent rpcs
157 chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
158 b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
160 LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
161 LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
163 if (lut->lut_reply_bitmap[chunk] == NULL) {
164 CERROR("%s: slot %d not allocated\n",
169 if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
170 CERROR("%s: slot %d already clear in bitmap\n",
179 /* Read header of reply_data file of target @tgt into structure @lrh */
180 static int tgt_reply_header_read(const struct lu_env *env,
181 struct lu_target *tgt,
182 struct lsd_reply_header *lrh)
185 struct lsd_reply_header buf;
186 struct tgt_thread_info *tti = tgt_th_info(env);
189 tti->tti_buf.lb_buf = &buf;
190 tti->tti_buf.lb_len = sizeof(buf);
192 rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
197 lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
198 lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
199 lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
201 CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
202 "header_size=%d reply_size=%d\n",
203 tgt->lut_obd->obd_name, REPLY_DATA,
204 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
209 /* Write header into replay_data file of target @tgt from structure @lrh */
210 static int tgt_reply_header_write(const struct lu_env *env,
211 struct lu_target *tgt,
212 struct lsd_reply_header *lrh)
215 struct lsd_reply_header buf;
216 struct tgt_thread_info *tti = tgt_th_info(env);
218 struct dt_object *dto;
220 CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
221 "header_size=%d reply_size=%d\n",
222 tgt->lut_obd->obd_name, REPLY_DATA,
223 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
225 if (tgt->lut_bottom->dd_rdonly)
228 buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
229 buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
230 buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
232 th = dt_trans_create(env, tgt->lut_bottom);
238 tti->tti_buf.lb_buf = &buf;
239 tti->tti_buf.lb_len = sizeof(buf);
241 rc = dt_declare_record_write(env, tgt->lut_reply_data,
242 &tti->tti_buf, tti->tti_off, th);
246 rc = dt_trans_start(env, tgt->lut_bottom, th);
250 dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
251 rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
253 dt_trans_stop(env, tgt->lut_bottom, th);
257 /* Write the reply data @lrd into reply_data file of target @tgt
260 static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
261 struct lsd_reply_data *lrd, loff_t off,
264 struct tgt_thread_info *tti = tgt_th_info(env);
265 struct lsd_reply_data *buf = &tti->tti_lrd;
266 struct lsd_reply_header *lrh = &tgt->lut_reply_header;
267 struct dt_object *dto;
269 lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
271 buf->lrd_transno = cpu_to_le64(lrd->lrd_transno);
272 buf->lrd_xid = cpu_to_le64(lrd->lrd_xid);
273 buf->lrd_data = cpu_to_le64(lrd->lrd_data);
274 buf->lrd_result = cpu_to_le32(lrd->lrd_result);
275 buf->lrd_client_gen = cpu_to_le32(lrd->lrd_client_gen);
277 lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
279 if (lrh->lrh_magic > LRH_MAGIC_V1)
280 buf->lrd_batch_idx = cpu_to_le32(lrd->lrd_batch_idx);
283 tti->tti_buf.lb_buf = buf;
284 tti->tti_buf.lb_len = lrh->lrh_reply_size;
286 dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
287 return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
290 /* Read the reply data from reply_data file of target @tgt at offset @off
291 * into structure @lrd
293 static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
294 struct lsd_reply_data *lrd, loff_t off,
295 struct lsd_reply_header *lrh)
297 struct tgt_thread_info *tti = tgt_th_info(env);
298 struct lsd_reply_data *buf = &tti->tti_lrd;
302 tti->tti_buf.lb_buf = buf;
303 tti->tti_buf.lb_len = lrh->lrh_reply_size;
305 rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
310 lrd->lrd_transno = le64_to_cpu(buf->lrd_transno);
311 lrd->lrd_xid = le64_to_cpu(buf->lrd_xid);
312 lrd->lrd_data = le64_to_cpu(buf->lrd_data);
313 lrd->lrd_result = le32_to_cpu(buf->lrd_result);
314 lrd->lrd_client_gen = le32_to_cpu(buf->lrd_client_gen);
316 if (lrh->lrh_magic > LRH_MAGIC_V1)
317 lrd->lrd_batch_idx = le32_to_cpu(buf->lrd_batch_idx);
319 lrd->lrd_batch_idx = 0;
324 /* Free the in-memory reply data structure @trd and release
325 * the corresponding slot in the reply_data file of target @lut
326 * Called with ted_lcd_lock held
328 static void tgt_free_reply_data(struct lu_target *lut,
329 struct tg_export_data *ted,
330 struct tg_reply_data *trd)
332 CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
333 "client gen %u, slot idx %d\n",
334 lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
335 trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
338 LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
340 list_del(&trd->trd_list);
341 ted->ted_reply_cnt--;
342 if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
343 tgt_clear_reply_slot(lut, trd->trd_index);
347 /* Release the reply data @trd from target @lut
348 * The reply data with the highest transno for this export
349 * is retained to ensure correctness of target recovery
350 * Called with ted_lcd_lock held
352 static void tgt_release_reply_data(struct lu_target *lut,
353 struct tg_export_data *ted,
354 struct tg_reply_data *trd)
356 CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
357 "client gen %u, slot idx %d\n",
358 lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
359 trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
362 LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
364 /* Do not free the reply data corresponding to the
365 * highest transno of this export.
366 * This ensures on-disk reply data is kept and
367 * last committed transno can be restored from disk in case
370 if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
371 /* free previous retained reply */
372 if (ted->ted_reply_last != NULL)
373 tgt_free_reply_data(lut, ted, ted->ted_reply_last);
374 /* retain the reply */
375 list_del_init(&trd->trd_list);
376 ted->ted_reply_last = trd;
378 tgt_free_reply_data(lut, ted, trd);
382 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
384 tti->tti_buf.lb_buf = &tti->tti_lsd;
385 tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
386 return &tti->tti_buf;
389 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
391 tti->tti_buf.lb_buf = &tti->tti_lcd;
392 tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
393 return &tti->tti_buf;
396 static inline bool tgt_is_multimodrpcs_record(struct lu_target *tgt,
397 struct lsd_client_data *lcd)
399 return tgt->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
400 lcd->lcd_generation != 0;
404 * Allocate in-memory data for client slot related to export.
406 int tgt_client_alloc(struct obd_export *exp)
409 LASSERT(exp != exp->exp_obd->obd_self_export);
411 spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
412 INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
413 spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
414 INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
416 OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
417 if (exp->exp_target_data.ted_lcd == NULL)
419 /* Mark that slot is not yet valid, 0 doesn't work here */
420 exp->exp_target_data.ted_lr_idx = -1;
421 INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list);
422 mutex_init(&exp->exp_target_data.ted_lcd_lock);
425 EXPORT_SYMBOL(tgt_client_alloc);
428 * Free in-memory data for client slot related to export.
430 void tgt_client_free(struct obd_export *exp)
432 struct tg_export_data *ted = &exp->exp_target_data;
433 struct lu_target *lut = class_exp2tgt(exp);
434 struct tg_reply_data *trd, *tmp;
436 LASSERT(exp != exp->exp_obd->obd_self_export);
438 tgt_fmd_cleanup(exp);
440 /* free reply data */
441 mutex_lock(&ted->ted_lcd_lock);
442 list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
443 tgt_release_reply_data(lut, ted, trd);
445 if (ted->ted_reply_last != NULL) {
446 tgt_free_reply_data(lut, ted, ted->ted_reply_last);
447 ted->ted_reply_last = NULL;
449 mutex_unlock(&ted->ted_lcd_lock);
451 if (!hlist_unhashed(&exp->exp_gen_hash))
452 cfs_hash_del(exp->exp_obd->obd_gen_hash,
453 &ted->ted_lcd->lcd_generation,
456 OBD_FREE_PTR(ted->ted_lcd);
459 /* Target may have been freed (see LU-7430)
460 * Slot may be not yet assigned */
461 if (((struct obd_device_target *)(&exp->exp_obd->u))->obt_magic !=
466 /* Clear bit when lcd is freed */
467 LASSERT(lut && lut->lut_client_bitmap);
468 if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
469 CERROR("%s: client %u bit already clear in bitmap\n",
470 exp->exp_obd->obd_name, ted->ted_lr_idx);
474 EXPORT_SYMBOL(tgt_client_free);
476 static inline void tgt_check_lcd(const char *obd_name, int index,
477 struct lsd_client_data *lcd)
479 size_t uuid_size = sizeof(lcd->lcd_uuid);
481 if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
482 lcd->lcd_uuid[uuid_size - 1] = '\0';
484 LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
485 lcd->lcd_uuid, obd_name, index);
489 static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
490 struct lsd_client_data *lcd,
491 loff_t *off, int index)
493 struct tgt_thread_info *tti = tgt_th_info(env);
497 rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
499 tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
500 lcd_le_to_cpu(&tti->tti_lcd, lcd);
501 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
502 lcd->lcd_last_close_result =
503 ptlrpc_status_ntoh(lcd->lcd_last_close_result);
506 CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = %llu"
507 ", last_xid = %llu, last_result = %u, last_data = %u, "
508 "last_close_transno = %llu, last_close_xid = %llu, "
509 "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
510 *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
511 lcd->lcd_last_result, lcd->lcd_last_data,
512 lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
513 lcd->lcd_last_close_result, rc);
517 static int tgt_client_data_write(const struct lu_env *env,
518 struct lu_target *tgt,
519 struct lsd_client_data *lcd,
520 loff_t *off, struct thandle *th)
522 struct tgt_thread_info *tti = tgt_th_info(env);
523 struct dt_object *dto;
525 lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
526 lcd->lcd_last_close_result =
527 ptlrpc_status_hton(lcd->lcd_last_close_result);
528 lcd_cpu_to_le(lcd, &tti->tti_lcd);
531 dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
532 return dt_record_write(env, dto, &tti->tti_buf, off, th);
535 struct tgt_new_client_callback {
536 struct dt_txn_commit_cb lncc_cb;
537 struct obd_export *lncc_exp;
540 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
541 struct dt_txn_commit_cb *cb, int err)
543 struct tgt_new_client_callback *ccb;
545 ccb = container_of(cb, struct tgt_new_client_callback, lncc_cb);
547 LASSERT(ccb->lncc_exp->exp_obd);
549 CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
550 ccb->lncc_exp->exp_obd->obd_name,
551 ccb->lncc_exp->exp_client_uuid.uuid);
553 spin_lock(&ccb->lncc_exp->exp_lock);
555 ccb->lncc_exp->exp_need_sync = 0;
557 spin_unlock(&ccb->lncc_exp->exp_lock);
558 class_export_cb_put(ccb->lncc_exp);
563 static int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
565 struct tgt_new_client_callback *ccb;
566 struct dt_txn_commit_cb *dcb;
573 ccb->lncc_exp = class_export_cb_get(exp);
576 dcb->dcb_func = tgt_cb_new_client;
577 INIT_LIST_HEAD(&dcb->dcb_linkage);
578 strscpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
580 rc = dt_trans_cb_add(th, dcb);
582 class_export_cb_put(exp);
589 * Update client data in last_rcvd
591 static int tgt_client_data_update(const struct lu_env *env,
592 struct obd_export *exp)
594 struct tg_export_data *ted = &exp->exp_target_data;
595 struct lu_target *tgt = class_exp2tgt(exp);
596 struct tgt_thread_info *tti = tgt_th_info(env);
602 if (unlikely(tgt == NULL)) {
603 CDEBUG(D_ERROR, "%s: No target for connected export\n",
604 class_exp2obd(exp)->obd_name);
608 if (tgt->lut_bottom->dd_rdonly)
611 th = dt_trans_create(env, tgt->lut_bottom);
616 rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
618 ted->ted_lr_off, th);
622 rc = dt_trans_start_local(env, tgt->lut_bottom, th);
626 mutex_lock(&ted->ted_lcd_lock);
629 * Until this operations will be committed the sync is needed
630 * for this export. This should be done _after_ starting the
631 * transaction so that many connecting clients will not bring
632 * server down with lots of sync writes.
634 rc = tgt_new_client_cb_add(th, exp);
636 /* can't add callback, do sync now */
639 spin_lock(&exp->exp_lock);
640 exp->exp_need_sync = 1;
641 spin_unlock(&exp->exp_lock);
644 tti->tti_off = ted->ted_lr_off;
645 rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
647 mutex_unlock(&ted->ted_lcd_lock);
651 dt_trans_stop(env, tgt->lut_bottom, th);
652 CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
653 "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
654 tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
659 static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
661 struct tgt_thread_info *tti = tgt_th_info(env);
666 rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
669 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
671 CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
672 "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
673 tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
677 static int tgt_server_data_write(const struct lu_env *env,
678 struct lu_target *tgt, struct thandle *th)
680 struct tgt_thread_info *tti = tgt_th_info(env);
681 struct dt_object *dto;
688 lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
690 dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
691 rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
693 CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
694 "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
695 tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
701 * Update server data in last_rcvd
703 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
706 struct tgt_thread_info *tti = tgt_th_info(env);
713 "%s: mount_count is %llu, last_transno is %llu\n",
714 tgt->lut_lsd.lsd_uuid, obd2obt(tgt->lut_obd)->obt_mount_count,
715 tgt->lut_last_transno);
717 /* Always save latest transno to keep it fresh */
718 spin_lock(&tgt->lut_translock);
719 tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
720 spin_unlock(&tgt->lut_translock);
722 if (tgt->lut_bottom->dd_rdonly)
725 th = dt_trans_create(env, tgt->lut_bottom);
732 rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
733 &tti->tti_buf, tti->tti_off, th);
737 rc = dt_trans_start(env, tgt->lut_bottom, th);
741 rc = tgt_server_data_write(env, tgt, th);
743 dt_trans_stop(env, tgt->lut_bottom, th);
745 CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
746 "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
747 tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
750 EXPORT_SYMBOL(tgt_server_data_update);
752 static int tgt_truncate_object(const struct lu_env *env, struct lu_target *tgt,
753 struct dt_object *dt, loff_t size)
761 if (tgt->lut_bottom->dd_rdonly)
765 attr.la_valid = LA_SIZE;
767 th = dt_trans_create(env, tgt->lut_bottom);
770 rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
773 rc = dt_declare_attr_set(env, dt, &attr, th);
776 rc = dt_trans_start_local(env, tgt->lut_bottom, th);
780 rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
782 rc = dt_attr_set(env, dt, &attr, th);
785 dt_trans_stop(env, tgt->lut_bottom, th);
790 static void tgt_client_epoch_update(const struct lu_env *env,
791 struct obd_export *exp)
793 struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
794 struct lu_target *tgt = class_exp2tgt(exp);
796 LASSERT(tgt && tgt->lut_bottom);
797 /** VBR: set client last_epoch to current epoch */
798 if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
800 lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
801 tgt_client_data_update(env, exp);
804 static int tgt_reply_data_upgrade_check(const struct lu_env *env,
805 struct lu_target *tgt)
807 struct lsd_reply_header *lrh = &tgt->lut_reply_header;
811 * Reply data is supported by MDT targets only for now.
812 * When reply data object @lut_reply_data is NULL, it indicates the
813 * target type is OST and it should skip the upgrade check.
815 if (tgt->lut_reply_data == NULL)
818 rc = tgt_reply_header_read(env, tgt, lrh);
820 CERROR("%s: failed to read %s: rc = %d\n",
821 tgt_name(tgt), REPLY_DATA, rc);
825 if (lrh->lrh_magic == LRH_MAGIC)
828 rc = tgt_truncate_object(env, tgt, tgt->lut_reply_data, 0);
830 CERROR("%s: failed to truncate %s: rc = %d\n",
831 tgt_name(tgt), REPLY_DATA, rc);
835 lrh->lrh_magic = LRH_MAGIC;
836 lrh->lrh_header_size = sizeof(struct lsd_reply_header);
837 if (lrh->lrh_magic == LRH_MAGIC_V1)
838 lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v1);
840 lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v2);
842 rc = tgt_reply_header_write(env, tgt, lrh);
844 CERROR("%s: failed to write header for %s: rc = %d\n",
845 tgt_name(tgt), REPLY_DATA, rc);
851 * Update boot epoch when recovery ends
853 void tgt_boot_epoch_update(struct lu_target *tgt)
856 struct ptlrpc_request *req;
858 LIST_HEAD(client_list);
861 if (tgt->lut_obd->obd_stopping)
864 rc = lu_env_init(&env, LCT_DT_THREAD);
866 CERROR("%s: can't initialize environment: rc = %d\n",
867 tgt->lut_obd->obd_name, rc);
871 spin_lock(&tgt->lut_translock);
872 start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
873 tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
874 tgt->lut_lsd.lsd_start_epoch = start_epoch;
875 spin_unlock(&tgt->lut_translock);
878 * The recovery is not yet finished and final queue can still be updated
879 * with resend requests. Move final list to separate one for processing
881 spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
882 list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
883 spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
886 * go through list of exports participated in recovery and
887 * set new epoch for them
889 list_for_each_entry(req, &client_list, rq_list) {
890 LASSERT(!req->rq_export->exp_delayed);
891 if (!req->rq_export->exp_vbr_failed)
892 tgt_client_epoch_update(&env, req->rq_export);
894 /** return list back at once */
895 spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
896 list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
897 spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
900 * Clear MULTI RPCS incompatibility flag if there is no multi-rpcs
901 * client in last_rcvd file
903 if (atomic_read(&tgt->lut_num_clients) == 0)
904 tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
906 /** update server epoch */
907 tgt_server_data_update(&env, tgt, 1);
908 tgt_reply_data_upgrade_check(&env, tgt);
913 * commit callback, need to update last_committed value
915 struct tgt_last_committed_callback {
916 struct dt_txn_commit_cb llcc_cb;
917 struct lu_target *llcc_tgt;
918 struct obd_export *llcc_exp;
922 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
923 struct dt_txn_commit_cb *cb, int err)
925 struct tgt_last_committed_callback *ccb;
927 ccb = container_of(cb, struct tgt_last_committed_callback, llcc_cb);
929 LASSERT(ccb->llcc_exp);
930 LASSERT(ccb->llcc_tgt != NULL);
931 LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
933 if (th->th_reserved_quota.lqi_space > 0) {
934 CDEBUG(D_QUOTA, "free quota %llu %llu\n",
935 th->th_reserved_quota.lqi_id.qid_gid,
936 th->th_reserved_quota.lqi_space);
938 /* env can be NULL for freeing reserved quota */
939 th->th_reserved_quota.lqi_space *= -1;
940 dt_reserve_or_free_quota(NULL, th->th_dev,
941 &th->th_reserved_quota);
944 /* error hit, don't update last committed to provide chance to
945 * replay data after fail */
949 /* Fast path w/o spinlock, if exp_last_committed was updated
950 * with higher transno, no need to take spinlock and check,
951 * also no need to update obd_last_committed. */
952 if (ccb->llcc_transno <= ccb->llcc_exp->exp_last_committed)
954 spin_lock(&ccb->llcc_tgt->lut_translock);
955 if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
956 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
958 if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
959 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
960 spin_unlock(&ccb->llcc_tgt->lut_translock);
962 ptlrpc_commit_replies(ccb->llcc_exp);
963 tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
965 spin_unlock(&ccb->llcc_tgt->lut_translock);
968 CDEBUG(D_HA, "%s: transno %lld is committed\n",
969 ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
972 class_export_cb_put(ccb->llcc_exp);
977 * Add commit callback function, it returns a non-zero value to inform
978 * caller to use sync transaction if necessary.
980 static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
981 struct obd_export *exp, __u64 transno)
983 struct tgt_last_committed_callback *ccb;
984 struct dt_txn_commit_cb *dcb;
992 ccb->llcc_exp = class_export_cb_get(exp);
993 ccb->llcc_transno = transno;
996 dcb->dcb_func = tgt_cb_last_committed;
997 INIT_LIST_HEAD(&dcb->dcb_linkage);
998 strscpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
1000 rc = dt_trans_cb_add(th, dcb);
1002 class_export_cb_put(exp);
1006 if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1007 /* report failure to force synchronous operation */
1010 /* if exp_need_sync is set, return non-zero value to force
1011 * a sync transaction. */
1012 return rc ? rc : exp->exp_need_sync;
1015 static int tgt_is_local_client(const struct lu_env *env,
1016 struct obd_export *exp)
1018 struct lu_target *tgt = class_exp2tgt(exp);
1019 struct tgt_session_info *tsi = tgt_ses_info(env);
1020 struct ptlrpc_request *req = tgt_ses_req(tsi);
1022 if (exp_connect_flags(exp) & OBD_CONNECT_MDS ||
1023 exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1025 if (tgt->lut_local_recovery)
1029 if (!LNetIsPeerLocal(&req->rq_peer.nid))
1036 * Add new client to the last_rcvd upon new connection.
1038 * We use a bitmap to locate a free space in the last_rcvd file and initialize
1041 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
1043 struct tg_export_data *ted = &exp->exp_target_data;
1044 struct lu_target *tgt = class_exp2tgt(exp);
1049 LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1050 if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
1053 if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1056 if (tgt_is_local_client(env, exp)) {
1057 LCONSOLE_WARN("%s: local client %s w/o recovery\n",
1058 exp->exp_obd->obd_name, ted->ted_lcd->lcd_uuid);
1059 exp->exp_no_recovery = 1;
1063 /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
1064 * there's no need for extra complication here
1066 idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
1068 if (idx >= LR_MAX_CLIENTS ||
1069 CFS_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
1070 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
1071 tgt->lut_obd->obd_name, idx);
1074 if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1075 idx = find_next_zero_bit(tgt->lut_client_bitmap,
1076 LR_MAX_CLIENTS, idx);
1080 ted->ted_lr_idx = idx;
1081 ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1082 idx * tgt->lut_lsd.lsd_client_size;
1084 LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1086 if (tgt_is_multimodrpcs_client(exp)) {
1087 /* Set MULTI RPCS incompatibility flag to prevent previous
1088 * Lustre versions to mount a target with reply_data file */
1089 if (!(tgt->lut_lsd.lsd_feature_incompat &
1090 OBD_INCOMPAT_MULTI_RPCS)) {
1091 tgt->lut_lsd.lsd_feature_incompat |=
1092 OBD_INCOMPAT_MULTI_RPCS;
1093 rc = tgt_server_data_update(env, tgt, 1);
1095 CERROR("%s: unable to set MULTI RPCS "
1096 "incompatibility flag\n",
1097 exp->exp_obd->obd_name);
1102 /* assign client slot generation */
1103 ted->ted_lcd->lcd_generation =
1104 atomic_inc_return(&tgt->lut_client_generation);
1106 ted->ted_lcd->lcd_generation = 0;
1109 CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' "
1111 tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1112 ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
1114 if (CFS_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
1117 rc = tgt_client_data_update(env, exp);
1119 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
1120 tgt->lut_obd->obd_name, idx, rc);
1124 if (tgt_is_multimodrpcs_client(exp))
1125 atomic_inc(&tgt->lut_num_clients);
1129 EXPORT_SYMBOL(tgt_client_new);
1131 /* Add an existing client to the MDS in-memory state based on
1132 * a client that was previously found in the last_rcvd file and
1133 * already has an assigned slot (idx >= 0).
1135 * It should not be possible to fail adding an existing client - otherwise
1136 * mdt_init_server_data() callsite needs to be fixed.
1138 int tgt_client_add(const struct lu_env *env, struct obd_export *exp, int idx)
1140 struct tg_export_data *ted = &exp->exp_target_data;
1141 struct lu_target *tgt = class_exp2tgt(exp);
1145 LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1146 LASSERTF(idx >= 0, "%d\n", idx);
1148 if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
1149 exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1152 if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1153 CERROR("%s: client %d: bit already set in bitmap!!\n",
1154 tgt->lut_obd->obd_name, idx);
1158 CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
1160 tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid,
1161 ted->ted_lcd->lcd_generation);
1163 ted->ted_lr_idx = idx;
1164 ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1165 idx * tgt->lut_lsd.lsd_client_size;
1167 mutex_init(&ted->ted_lcd_lock);
1169 LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1174 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
1176 struct tg_export_data *ted = &exp->exp_target_data;
1177 struct lu_target *tgt = class_exp2tgt(exp);
1182 LASSERT(ted->ted_lcd);
1184 if (unlikely(tgt == NULL)) {
1185 CDEBUG(D_ERROR, "%s: No target for connected export\n",
1186 class_exp2obd(exp)->obd_name);
1190 /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
1191 if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
1192 (char *)tgt->lut_obd->obd_uuid.uuid) ||
1193 exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT ||
1194 exp->exp_no_recovery)
1197 /* Slot may be not yet assigned, use case is race between Client
1198 * reconnect and forced eviction */
1199 if (ted->ted_lr_idx < 0) {
1200 CWARN("%s: client with UUID '%s' not in bitmap\n",
1201 tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid);
1205 CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
1206 tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1207 ted->ted_lcd->lcd_uuid);
1209 /* Clear the bit _after_ zeroing out the client so we don't
1210 race with filter_client_add and zero out new clients.*/
1211 if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
1212 CERROR("%s: client %u: bit already clear in bitmap!!\n",
1213 tgt->lut_obd->obd_name, ted->ted_lr_idx);
1217 /* Do not erase record for recoverable client. */
1218 if (exp->exp_flags & OBD_OPT_FAILOVER)
1221 if (CFS_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
1224 /* Make sure the server's last_transno is up to date.
1225 * This should be done before zeroing client slot so last_transno will
1226 * be in server data or in client data in case of failure */
1227 rc = tgt_server_data_update(env, tgt, 0);
1229 CERROR("%s: failed to update server data, skip client %s "
1230 "zeroing, rc %d\n", tgt->lut_obd->obd_name,
1231 ted->ted_lcd->lcd_uuid, rc);
1235 /* Race between an eviction and a disconnection ?*/
1236 mutex_lock(&ted->ted_lcd_lock);
1237 if (ted->ted_lcd->lcd_uuid[0] == '\0') {
1238 mutex_unlock(&ted->ted_lcd_lock);
1242 memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
1243 mutex_unlock(&ted->ted_lcd_lock);
1245 rc = tgt_client_data_update(env, exp);
1247 if (!rc && tgt_is_multimodrpcs_record(tgt, ted->ted_lcd))
1248 atomic_dec(&tgt->lut_num_clients);
1250 CDEBUG(rc == 0 ? D_INFO : D_ERROR,
1251 "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
1252 tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
1253 ted->ted_lr_idx, ted->ted_lr_off, rc);
1256 EXPORT_SYMBOL(tgt_client_del);
1258 static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
1260 struct tg_export_data *ted = &exp->exp_target_data;
1261 struct lu_target *lut = class_exp2tgt(exp);
1262 struct tg_reply_data *trd, *tmp;
1267 list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
1268 if (trd->trd_tag != tag)
1271 LASSERT(ergo(tgt_is_increasing_xid_client(exp),
1272 trd->trd_reply.lrd_xid <= xid));
1274 ted->ted_release_tag++;
1275 tgt_release_reply_data(lut, ted, trd);
1279 static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
1280 struct tg_export_data *ted, struct tg_reply_data *trd,
1281 struct ptlrpc_request *req,
1282 struct thandle *th, bool update_lrd_file)
1284 struct tgt_session_info *tsi = NULL;
1285 struct lsd_reply_data *lrd;
1289 lrd = &trd->trd_reply;
1290 /* update export last transno */
1291 mutex_lock(&ted->ted_lcd_lock);
1292 if (lrd->lrd_transno > ted->ted_lcd->lcd_last_transno)
1293 ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
1294 mutex_unlock(&ted->ted_lcd_lock);
1297 trd->trd_index = TRD_INDEX_MEMORY;
1298 GOTO(add_reply_data, rc = 0);
1302 tsi = tgt_ses_info(env);
1303 if (tsi->tsi_batch_trd) {
1304 LASSERT(tsi->tsi_batch_env);
1305 trd = tsi->tsi_batch_trd;
1311 /* find a empty slot */
1312 i = tgt_find_free_reply_slot(tgt);
1313 if (unlikely(i < 0)) {
1314 CERROR("%s: couldn't find a slot for reply data: rc = %d\n",
1321 if (update_lrd_file) {
1322 struct lsd_reply_header *lrh = &tgt->lut_reply_header;
1325 /* write reply data to disk */
1326 off = lrh->lrh_header_size + lrh->lrh_reply_size * i;
1327 rc = tgt_reply_data_write(env, tgt, lrd, off, th);
1328 if (unlikely(rc != 0)) {
1329 CERROR("%s: can't update %s file: rc = %d\n",
1330 tgt_name(tgt), REPLY_DATA, rc);
1331 GOTO(free_slot, rc);
1336 /* add reply data to target export's reply list */
1337 mutex_lock(&ted->ted_lcd_lock);
1339 int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
1340 MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
1342 if (req->rq_obsolete) {
1344 "drop reply data update for obsolete req xid=%llu,"
1345 "transno=%llu, tag=%hu\n", req->rq_xid,
1346 lrd->lrd_transno, trd->trd_tag);
1347 mutex_unlock(&ted->ted_lcd_lock);
1348 GOTO(free_slot, rc = -EBADR);
1351 if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude) &&
1352 !(tsi && tsi->tsi_batch_env &&
1353 trd->trd_reply.lrd_batch_idx > 0))
1354 tgt_clean_by_tag(req->rq_export, req->rq_xid,
1359 * For the batched RPC, all sub requests use one common @trd for the
1362 if (list_empty(&trd->trd_list)) {
1363 list_add(&trd->trd_list, &ted->ted_reply_list);
1364 ted->ted_reply_cnt++;
1365 if (ted->ted_reply_cnt > ted->ted_reply_max)
1366 ted->ted_reply_max = ted->ted_reply_cnt;
1368 mutex_unlock(&ted->ted_lcd_lock);
1370 CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
1371 "tag %hu, client gen %u, slot idx %d\n",
1372 trd, lrd->lrd_xid, lrd->lrd_transno,
1373 trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
1379 tgt_clear_reply_slot(tgt, trd->trd_index);
1383 int tgt_mk_reply_data(const struct lu_env *env,
1384 struct lu_target *tgt,
1385 struct tg_export_data *ted,
1386 struct ptlrpc_request *req,
1392 struct tg_reply_data *trd = NULL;
1393 struct lsd_reply_data *lrd;
1394 __u64 *pre_versions = NULL;
1395 struct tgt_session_info *tsi = NULL;
1399 tsi = tgt_ses_info(env);
1400 if (tsi->tsi_batch_trd) {
1401 LASSERT(tsi->tsi_batch_env);
1402 trd = tsi->tsi_batch_trd;
1408 if (unlikely(trd == NULL))
1411 INIT_LIST_HEAD(&trd->trd_list);
1414 /* fill reply data information */
1415 lrd = &trd->trd_reply;
1416 lrd->lrd_transno = transno;
1417 if (tsi && tsi->tsi_batch_env) {
1418 if (tsi->tsi_batch_idx == 0) {
1419 LASSERT(req != NULL);
1420 tsi->tsi_batch_trd = trd;
1421 trd->trd_index = -1;
1422 lrd->lrd_xid = req->rq_xid;
1423 trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1424 lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1426 lrd->lrd_batch_idx = tsi->tsi_batch_idx;
1427 } else if (req != NULL) {
1428 lrd->lrd_xid = req->rq_xid;
1429 trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1430 lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1432 pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1433 lrd->lrd_result = th->th_result;
1436 LASSERT(env != NULL);
1437 LASSERT(tsi->tsi_xid != 0);
1439 lrd->lrd_xid = tsi->tsi_xid;
1440 lrd->lrd_result = tsi->tsi_result;
1441 lrd->lrd_client_gen = tsi->tsi_client_gen;
1444 lrd->lrd_data = opdata;
1446 trd->trd_pre_versions[0] = pre_versions[0];
1447 trd->trd_pre_versions[1] = pre_versions[1];
1448 trd->trd_pre_versions[2] = pre_versions[2];
1449 trd->trd_pre_versions[3] = pre_versions[3];
1452 if (tsi && tsi->tsi_open_obj)
1453 trd->trd_object = *lu_object_fid(&tsi->tsi_open_obj->do_lu);
1455 rc = tgt_add_reply_data(env, tgt, ted, trd, req,
1465 EXPORT_SYMBOL(tgt_mk_reply_data);
1468 * last_rcvd & last_committed update callbacks
1470 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
1471 struct dt_object *obj, __u64 opdata,
1472 struct thandle *th, struct ptlrpc_request *req)
1474 struct tgt_thread_info *tti = tgt_th_info(env);
1475 struct tgt_session_info *tsi = tgt_ses_info(env);
1476 struct obd_export *exp = tsi->tsi_exp;
1477 struct tg_export_data *ted;
1485 LASSERT(exp != NULL);
1486 ted = &exp->exp_target_data;
1488 /* Some clients don't support recovery, and they don't have last_rcvd
1490 * 1. lightweight clients.
1491 * 2. local clients on MDS which doesn't enable "localrecov".
1492 * 3. OFD connect may cause transaction before export has last_rcvd
1495 if (ted->ted_lr_idx < 0)
1499 tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1501 /* From update replay, tti_transno should be set already */
1502 LASSERT(tti->tti_transno != 0);
1504 spin_lock(&tgt->lut_translock);
1505 if (th->th_result != 0) {
1506 if (tti->tti_transno != 0) {
1507 CERROR("%s: replay transno %llu failed: rc = %d\n",
1508 tgt_name(tgt), tti->tti_transno, th->th_result);
1510 } else if (tti->tti_transno == 0) {
1511 tti->tti_transno = ++tgt->lut_last_transno;
1513 /* should be replay */
1514 if (tti->tti_transno > tgt->lut_last_transno)
1515 tgt->lut_last_transno = tti->tti_transno;
1517 spin_unlock(&tgt->lut_translock);
1519 /** VBR: set new versions */
1520 if (th->th_result == 0 && obj != NULL) {
1521 struct dt_object *dto = dt_object_locate(obj, th->th_dev);
1523 dt_version_set(env, dto, tti->tti_transno, th);
1524 if (unlikely(tsi->tsi_dv_update))
1525 dt_data_version_set(env, dto, tti->tti_transno, th);
1528 /* filling reply data */
1529 CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1530 tti->tti_transno, tgt->lut_obd->obd_last_committed);
1533 req->rq_transno = tti->tti_transno;
1534 lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
1537 /* if can't add callback, do sync write */
1538 th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
1541 /* store transno in the last_rcvd header */
1542 spin_lock(&tgt->lut_translock);
1543 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
1544 tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
1545 spin_unlock(&tgt->lut_translock);
1546 /* Although current connection doesn't have slot
1547 * in the last_rcvd, we still want to maintain
1548 * the in-memory lsd_client_data structure in order to
1549 * properly handle reply reconstruction. */
1550 rc = tgt_server_data_write(env, tgt, th);
1552 spin_unlock(&tgt->lut_translock);
1554 } else if (ted->ted_lr_off == 0) {
1555 CERROR("%s: client idx %d has offset %lld\n",
1556 tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
1560 /* Target that supports multiple reply data */
1561 if (tgt_is_multimodrpcs_client(exp)) {
1562 return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
1563 !!(req != NULL), tti->tti_transno);
1566 /* Enough for update replay, let's return */
1570 mutex_lock(&ted->ted_lcd_lock);
1571 LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1572 if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
1573 transno_p = &ted->ted_lcd->lcd_last_close_transno;
1574 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
1575 ted->ted_lcd->lcd_last_close_result = th->th_result;
1577 /* VBR: save versions in last_rcvd for reconstruct. */
1578 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1581 ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
1582 ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
1583 ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
1584 ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
1586 transno_p = &ted->ted_lcd->lcd_last_transno;
1587 ted->ted_lcd->lcd_last_xid = req->rq_xid;
1588 ted->ted_lcd->lcd_last_result = th->th_result;
1589 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
1590 * see struct ldlm_reply->lock_policy_res1; */
1591 ted->ted_lcd->lcd_last_data = opdata;
1594 /* Update transno in slot only if non-zero number, i.e. no errors */
1595 if (likely(tti->tti_transno != 0)) {
1596 /* Don't overwrite bigger transaction number with lower one.
1597 * That is not sign of problem in all cases, but in any case
1598 * this value should be monotonically increased only. */
1599 if (*transno_p > tti->tti_transno) {
1600 if (!tgt->lut_no_reconstruct) {
1601 CERROR("%s: trying to overwrite bigger transno:"
1602 "on-disk: %llu, new: %llu replay: "
1603 "%d. See LU-617.\n", tgt_name(tgt),
1604 *transno_p, tti->tti_transno,
1605 req_is_replay(req));
1606 if (req_is_replay(req)) {
1607 spin_lock(&req->rq_export->exp_lock);
1608 req->rq_export->exp_vbr_failed = 1;
1609 spin_unlock(&req->rq_export->exp_lock);
1611 mutex_unlock(&ted->ted_lcd_lock);
1612 RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
1615 *transno_p = tti->tti_transno;
1620 tti->tti_off = ted->ted_lr_off;
1621 if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
1624 rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
1627 mutex_unlock(&ted->ted_lcd_lock);
1631 mutex_unlock(&ted->ted_lcd_lock);
1636 * last_rcvd update for echo client simulation.
1637 * It updates last_rcvd client slot and version of object in
1638 * simple way but with all locks to simulate all drawbacks
1640 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
1641 struct lu_target *tgt,
1642 struct dt_object *obj,
1644 struct obd_export *exp)
1646 struct tgt_thread_info *tti = tgt_th_info(env);
1647 struct tg_export_data *ted = &exp->exp_target_data;
1652 tti->tti_transno = 0;
1654 spin_lock(&tgt->lut_translock);
1655 if (th->th_result == 0)
1656 tti->tti_transno = ++tgt->lut_last_transno;
1657 spin_unlock(&tgt->lut_translock);
1659 /** VBR: set new versions */
1660 if (th->th_result == 0 && obj != NULL)
1661 dt_version_set(env, obj, tti->tti_transno, th);
1663 /* if can't add callback, do sync write */
1664 th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
1667 LASSERT(ted->ted_lr_off > 0);
1669 mutex_lock(&ted->ted_lcd_lock);
1670 LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1671 ted->ted_lcd->lcd_last_transno = tti->tti_transno;
1672 ted->ted_lcd->lcd_last_result = th->th_result;
1674 tti->tti_off = ted->ted_lr_off;
1675 rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
1676 mutex_unlock(&ted->ted_lcd_lock);
1680 static int tgt_clients_data_init(const struct lu_env *env,
1681 struct lu_target *tgt,
1682 unsigned long last_size)
1684 struct obd_device *obd = tgt->lut_obd;
1685 struct lr_server_data *lsd = &tgt->lut_lsd;
1686 struct lsd_client_data *lcd = NULL;
1687 struct tg_export_data *ted;
1690 loff_t off = lsd->lsd_client_start;
1691 __u32 generation = 0;
1692 struct cfs_hash *hash = NULL;
1696 if (tgt->lut_bottom->dd_rdonly)
1699 BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
1700 sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
1706 hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
1708 GOTO(err_out, rc = -ENODEV);
1710 for (cl_idx = 0; off < last_size; cl_idx++) {
1711 struct obd_export *exp;
1714 /* Don't assume off is incremented properly by
1715 * read_record(), in case sizeof(*lcd)
1716 * isn't the same as fsd->lsd_client_size. */
1717 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
1718 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
1720 CERROR("%s: error reading last_rcvd %s idx %d off "
1721 "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
1724 break; /* read error shouldn't cause startup to fail */
1727 if (lcd->lcd_uuid[0] == '\0') {
1728 CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
1733 last_transno = lcd_last_transno(lcd);
1735 /* These exports are cleaned up by disconnect, so they
1736 * need to be set up like real exports as connect does.
1738 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
1739 " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
1740 cl_idx, last_transno, lsd->lsd_last_transno,
1741 lcd_last_xid(lcd), lcd->lcd_generation);
1743 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
1745 if (PTR_ERR(exp) == -EALREADY) {
1746 /* export already exists, zero out this one */
1747 CERROR("%s: Duplicate export %s!\n",
1748 tgt_name(tgt), lcd->lcd_uuid);
1751 GOTO(err_out, rc = PTR_ERR(exp));
1754 ted = &exp->exp_target_data;
1755 *ted->ted_lcd = *lcd;
1757 rc = tgt_client_add(env, exp, cl_idx);
1758 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
1759 /* VBR: set export last committed version */
1760 exp->exp_last_committed = last_transno;
1761 spin_lock(&exp->exp_lock);
1762 exp->exp_connecting = 0;
1763 exp->exp_in_recovery = 0;
1764 spin_unlock(&exp->exp_lock);
1765 atomic_inc(&obd->obd_max_recoverable_clients);
1767 if (tgt_is_multimodrpcs_record(tgt, lcd)) {
1768 atomic_inc(&tgt->lut_num_clients);
1770 /* compute the highest valid client generation */
1771 generation = max(generation, lcd->lcd_generation);
1772 /* fill client_generation <-> export hash table */
1773 rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
1774 &exp->exp_gen_hash);
1776 CERROR("%s: duplicate export for client "
1778 tgt_name(tgt), lcd->lcd_generation);
1779 class_export_put(exp);
1784 class_export_put(exp);
1786 rc = rev_import_init(exp);
1788 class_unlink_export(exp);
1792 /* Need to check last_rcvd even for duplicated exports. */
1793 CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
1794 cl_idx, last_transno);
1796 spin_lock(&tgt->lut_translock);
1797 tgt->lut_last_transno = max(last_transno,
1798 tgt->lut_last_transno);
1799 spin_unlock(&tgt->lut_translock);
1802 /* record highest valid client generation */
1803 atomic_set(&tgt->lut_client_generation, generation);
1807 cfs_hash_putref(hash);
1812 struct server_compat_data {
1819 static struct server_compat_data tgt_scd[] = {
1820 [LDD_F_SV_TYPE_MDT] = {
1821 .rocompat = OBD_ROCOMPAT_LOVOBJID,
1822 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1823 OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
1824 OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
1825 OBD_INCOMPAT_MULTI_RPCS,
1826 .rocinit = OBD_ROCOMPAT_LOVOBJID,
1827 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1828 OBD_INCOMPAT_MULTI_OI,
1830 [LDD_F_SV_TYPE_OST] = {
1831 .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
1832 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
1834 .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
1835 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
1839 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
1841 struct tgt_thread_info *tti = tgt_th_info(env);
1842 struct lr_server_data *lsd = &tgt->lut_lsd;
1843 unsigned long last_rcvd_size;
1847 rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
1851 last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1853 /* ensure padding in the struct is the correct size */
1854 BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
1855 sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
1857 rc = server_name2index(tgt_name(tgt), &index, NULL);
1859 CERROR("%s: Can not get index from name: rc = %d\n",
1863 /* server_name2index() returns type */
1865 if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1866 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1870 /* last_rcvd on OST doesn't provide reconstruct support because there
1871 * may be up to 8 in-flight write requests per single slot in
1872 * last_rcvd client data
1874 tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1876 if (last_rcvd_size == 0) {
1877 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1879 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1880 sizeof(lsd->lsd_uuid));
1881 lsd->lsd_last_transno = 0;
1882 lsd->lsd_mount_count = 0;
1883 lsd->lsd_server_size = LR_SERVER_SIZE;
1884 lsd->lsd_client_start = LR_CLIENT_START;
1885 lsd->lsd_client_size = LR_CLIENT_SIZE;
1886 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1887 lsd->lsd_osd_index = index;
1888 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1889 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1891 rc = tgt_server_data_read(env, tgt);
1893 CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1897 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1898 if (tgt->lut_bottom->dd_rdonly) {
1899 /* Such difference may be caused by mounting
1900 * up snapshot with new fsname under rd_only
1901 * mode. But even if it was NOT, it will not
1902 * damage the system because of "rd_only". */
1903 memcpy(lsd->lsd_uuid,
1904 tgt->lut_obd->obd_uuid.uuid,
1905 sizeof(lsd->lsd_uuid));
1907 LCONSOLE_ERROR_MSG(0x157, "Trying to start "
1908 "OBD %s using the wrong "
1909 "disk %s. Were the /dev/ "
1910 "assignments rearranged?\n",
1911 tgt->lut_obd->obd_uuid.uuid,
1917 if (lsd->lsd_osd_index != index) {
1918 LCONSOLE_ERROR_MSG(0x157,
1919 "%s: index %d in last rcvd is different with the index %d in config log, It might be disk corruption!\n",
1921 lsd->lsd_osd_index, index);
1926 if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1927 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1929 lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1933 if (type == LDD_F_SV_TYPE_MDT)
1934 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1936 if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1937 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1939 lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1942 /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1943 if (type == LDD_F_SV_TYPE_MDT &&
1944 !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1945 if (last_rcvd_size > lsd->lsd_client_start) {
1946 LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1947 "remove all clients for interop needs\n",
1949 rc = tgt_truncate_object(env, tgt, tgt->lut_last_rcvd,
1950 lsd->lsd_client_start);
1953 last_rcvd_size = lsd->lsd_client_start;
1955 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1956 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1959 spin_lock(&tgt->lut_translock);
1960 tgt->lut_last_transno = lsd->lsd_last_transno;
1961 spin_unlock(&tgt->lut_translock);
1963 lsd->lsd_mount_count++;
1965 CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1966 CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
1967 tgt_name(tgt), tgt->lut_last_transno);
1968 CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
1969 tgt_name(tgt), lsd->lsd_mount_count);
1970 CDEBUG(D_INODE, "%s: server data size: %u\n",
1971 tgt_name(tgt), lsd->lsd_server_size);
1972 CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1973 tgt_name(tgt), lsd->lsd_client_start);
1974 CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1975 tgt_name(tgt), lsd->lsd_client_size);
1976 CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1977 tgt_name(tgt), last_rcvd_size);
1978 CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1979 tgt_name(tgt), lsd->lsd_subdir_count);
1980 CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1981 last_rcvd_size <= lsd->lsd_client_start ? 0 :
1982 (last_rcvd_size - lsd->lsd_client_start) /
1983 lsd->lsd_client_size);
1984 CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1986 if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1987 lsd->lsd_client_size == 0) {
1988 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1992 if (!tgt->lut_obd->obd_replayable)
1993 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1995 rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1997 GOTO(err_client, rc);
1999 spin_lock(&tgt->lut_translock);
2000 /* obd_last_committed is used for compatibility
2001 * with other lustre recovery code */
2002 tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
2003 spin_unlock(&tgt->lut_translock);
2005 obd2obt(tgt->lut_obd)->obt_mount_count = lsd->lsd_mount_count;
2006 obd2obt(tgt->lut_obd)->obt_instance = (__u32)lsd->lsd_mount_count;
2008 /* save it, so mount count and last_transno is current */
2009 rc = tgt_server_data_update(env, tgt, 0);
2011 GOTO(err_client, rc);
2016 class_disconnect_exports(tgt->lut_obd);
2020 /* add credits for last_rcvd update */
2021 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
2024 struct lu_target *tgt = cookie;
2025 struct tgt_session_info *tsi;
2026 struct tgt_thread_info *tti = tgt_th_info(env);
2027 struct dt_object *dto;
2030 /* For readonly case, the caller should have got failure
2031 * when start the transaction. If the logic comes here,
2032 * there must be something wrong. */
2033 if (unlikely(tgt->lut_bottom->dd_rdonly)) {
2038 /* if there is no session, then this transaction is not result of
2039 * request processing but some local operation */
2040 if (env->le_ses == NULL)
2043 LASSERT(tgt->lut_last_rcvd);
2044 tsi = tgt_ses_info(env);
2045 /* OFD may start transaction without export assigned */
2046 if (tsi->tsi_exp == NULL)
2049 if (tgt_is_multimodrpcs_client(tsi->tsi_exp)) {
2051 * Use maximum possible file offset for declaration to ensure
2052 * ZFS will reserve enough credits for a write anywhere in this
2053 * file, since we don't know where in the file the write will be
2054 * because a replay slot has not been assigned. This should be
2055 * replaced by dmu_tx_hold_append() when available.
2057 tti->tti_buf.lb_buf = NULL;
2058 tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
2059 dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
2060 rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
2064 dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
2066 tti->tti_off = tsi->tsi_exp->exp_target_data.ted_lr_off;
2067 rc = dt_declare_record_write(env, dto, &tti->tti_buf,
2073 if (tsi->tsi_vbr_obj != NULL &&
2074 !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2075 dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
2076 rc = dt_declare_version_set(env, dto, th);
2077 if (!rc && tsi->tsi_dv_update)
2078 rc = dt_declare_data_version_set(env, dto, th);
2084 /* Update last_rcvd records with latests transaction data */
2085 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
2088 struct lu_target *tgt = cookie;
2089 struct tgt_session_info *tsi;
2090 struct tgt_thread_info *tti = tgt_th_info(env);
2091 struct dt_object *obj = NULL;
2095 if (env->le_ses == NULL)
2098 tsi = tgt_ses_info(env);
2099 /* OFD may start transaction without export assigned */
2100 if (tsi->tsi_exp == NULL)
2103 echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
2105 if (tsi->tsi_has_trans && !echo_client && !tsi->tsi_batch_env) {
2106 if (!tsi->tsi_mult_trans) {
2107 CDEBUG(D_HA, "More than one transaction %llu\n",
2110 * if RPC handler sees unexpected multiple last_rcvd
2111 * updates with transno, then it is better to return
2112 * the latest transaction number to the client.
2113 * In that case replay may fail if part of operation
2114 * was committed and can't be re-applied easily. But
2115 * that is better than report the first transno, in
2116 * which case partially committed operation would be
2117 * considered as finished so never replayed causing
2121 /* we need new transno to be assigned */
2122 tti->tti_transno = 0;
2126 tsi->tsi_has_trans++;
2128 if (tsi->tsi_vbr_obj != NULL &&
2129 !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2130 obj = tsi->tsi_vbr_obj;
2133 if (unlikely(echo_client)) /* echo client special case */
2134 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
2137 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
2142 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
2144 struct tgt_thread_info *tti = tgt_th_info(env);
2145 struct lsd_reply_data *lrd = &tti->tti_lrd;
2146 unsigned long reply_data_size;
2148 struct lsd_reply_header *lrh = &tgt->lut_reply_header;
2149 struct tg_reply_data *trd = NULL;
2152 struct cfs_hash *hash = NULL;
2153 struct obd_export *exp;
2154 struct tg_export_data *ted;
2155 int reply_data_recovered = 0;
2157 rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr);
2160 reply_data_size = (unsigned long)tti->tti_attr.la_size;
2162 if (reply_data_size == 0) {
2163 CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
2165 lrh->lrh_magic = LRH_MAGIC;
2166 lrh->lrh_header_size = sizeof(struct lsd_reply_header);
2167 if (lrh->lrh_magic == LRH_MAGIC_V1)
2168 lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v1);
2170 lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v2);
2171 rc = tgt_reply_header_write(env, tgt, lrh);
2173 CERROR("%s: error writing %s: rc = %d\n",
2174 tgt_name(tgt), REPLY_DATA, rc);
2178 __u32 recsz = sizeof(*lrd);
2179 const char *lrd_ver = "v2";
2181 rc = tgt_reply_header_read(env, tgt, lrh);
2183 CERROR("%s: error reading %s: rc = %d\n",
2184 tgt_name(tgt), REPLY_DATA, rc);
2188 switch (lrh->lrh_magic) {
2189 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 5, 53, 0)
2190 /* The old reply_data is replaced on the first mount after
2191 * an upgrade, so no need to keep this interop code forever.
2194 recsz = sizeof(struct lsd_reply_data_v1);
2197 if (lrh->lrh_magic != LRH_MAGIC)
2198 CWARN("%s: %s record size will be %s\n",
2199 tgt_name(tgt), REPLY_DATA,
2200 lrh->lrh_magic < LRH_MAGIC ?
2201 "upgraded" : "downgraded");
2205 if (lrh->lrh_header_size != sizeof(*lrh)) {
2206 CERROR("%s: bad %s %s header size: %u != %lu\n",
2207 tgt_name(tgt), REPLY_DATA, lrd_ver,
2208 lrh->lrh_header_size, sizeof(*lrh));
2209 GOTO(out, rc = -EINVAL);
2211 if (lrh->lrh_reply_size != recsz) {
2212 CERROR("%s: bad %s %s reply size: %u != %u\n",
2213 tgt_name(tgt), REPLY_DATA, lrd_ver,
2214 lrh->lrh_reply_size, recsz);
2215 GOTO(out, rc = -EINVAL);
2219 CERROR("%s: invalid %s magic: %x != %x/%x\n",
2220 tgt_name(tgt), REPLY_DATA,
2221 lrh->lrh_magic, LRH_MAGIC_V1, LRH_MAGIC_V2);
2222 GOTO(out, rc = -EINVAL);
2225 hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
2227 GOTO(out, rc = -ENODEV);
2231 GOTO(out, rc = -ENOMEM);
2233 /* Load reply_data from disk */
2234 for (idx = 0, off = lrh->lrh_header_size;
2235 off < reply_data_size; idx++, off += recsz) {
2236 rc = tgt_reply_data_read(env, tgt, lrd, off, lrh);
2238 CERROR("%s: error reading %s: rc = %d\n",
2239 tgt_name(tgt), REPLY_DATA, rc);
2243 exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
2245 /* old reply data from a disconnected client */
2248 ted = &exp->exp_target_data;
2249 mutex_lock(&ted->ted_lcd_lock);
2251 /* create in-memory reply_data and link it to
2252 * target export's reply list */
2253 rc = tgt_set_reply_slot(tgt, idx);
2255 mutex_unlock(&ted->ted_lcd_lock);
2258 trd->trd_reply = *lrd;
2259 trd->trd_pre_versions[0] = 0;
2260 trd->trd_pre_versions[1] = 0;
2261 trd->trd_pre_versions[2] = 0;
2262 trd->trd_pre_versions[3] = 0;
2263 trd->trd_index = idx;
2265 fid_zero(&trd->trd_object);
2266 list_add(&trd->trd_list, &ted->ted_reply_list);
2267 ted->ted_reply_cnt++;
2268 if (ted->ted_reply_cnt > ted->ted_reply_max)
2269 ted->ted_reply_max = ted->ted_reply_cnt;
2271 CDEBUG(D_HA, "%s: restore reply %p: xid %llu, "
2272 "transno %llu, client gen %u, slot idx %d\n",
2273 tgt_name(tgt), trd, lrd->lrd_xid,
2274 lrd->lrd_transno, lrd->lrd_client_gen,
2277 /* update export last committed transation */
2278 exp->exp_last_committed = max(exp->exp_last_committed,
2280 /* Update lcd_last_transno as well for check in
2281 * tgt_release_reply_data() or the latest client
2282 * transno can be lost.
2284 ted->ted_lcd->lcd_last_transno =
2285 max(ted->ted_lcd->lcd_last_transno,
2286 exp->exp_last_committed);
2288 mutex_unlock(&ted->ted_lcd_lock);
2289 class_export_put(exp);
2291 /* update target last committed transaction */
2292 spin_lock(&tgt->lut_translock);
2293 tgt->lut_last_transno = max(tgt->lut_last_transno,
2295 spin_unlock(&tgt->lut_translock);
2297 reply_data_recovered++;
2301 GOTO(out, rc = -ENOMEM);
2303 CDEBUG(D_INFO, "%s: %d reply data have been recovered\n",
2304 tgt_name(tgt), reply_data_recovered);
2307 spin_lock(&tgt->lut_translock);
2308 /* obd_last_committed is used for compatibility
2309 * with other lustre recovery code */
2310 tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
2311 spin_unlock(&tgt->lut_translock);
2317 cfs_hash_putref(hash);
2323 static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
2324 struct tg_reply_data *trd)
2326 struct tg_export_data *ted = &req->rq_export->exp_target_data;
2327 struct lu_target *lut = class_exp2tgt(req->rq_export);
2328 __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
2330 struct tg_reply_data *reply;
2331 bool check_increasing;
2336 check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
2337 !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2338 if (!lookup && !check_increasing)
2341 list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
2342 if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
2347 } else if (check_increasing && reply->trd_tag == tag &&
2348 reply->trd_reply.lrd_xid > req->rq_xid) {
2350 CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
2351 tgt_name(lut), tag, req->rq_xid, trd,
2352 reply->trd_reply.lrd_xid,
2353 reply->trd_reply.lrd_transno,
2354 reply->trd_reply.lrd_client_gen,
2355 reply->trd_index, rc);
2363 /* Look for a reply data matching specified request @req
2364 * A copy is returned in @trd if the pointer is not NULL
2366 int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
2368 struct tg_export_data *ted = &req->rq_export->exp_target_data;
2370 bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2372 mutex_lock(&ted->ted_lcd_lock);
2373 if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
2374 /* A check for the last_xid is needed here in case there is
2375 * no reply data is left in the list. It may happen if another
2376 * RPC on another slot increased the last_xid between our
2377 * process_req_last_xid & tgt_lookup_reply calls */
2380 found = tgt_check_lookup_req(req, 1, trd);
2382 mutex_unlock(&ted->ted_lcd_lock);
2384 CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
2385 tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
2386 req->rq_export->exp_last_xid);
2390 EXPORT_SYMBOL(tgt_lookup_reply);
2392 int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
2394 struct tg_export_data *ted = &exp->exp_target_data;
2395 struct lu_target *lut = class_exp2tgt(exp);
2396 struct tg_reply_data *trd, *tmp;
2399 list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
2400 if (trd->trd_reply.lrd_xid > rcvd_xid)
2402 ted->ted_release_xid++;
2403 tgt_release_reply_data(lut, ted, trd);
2409 int tgt_handle_tag(struct ptlrpc_request *req)
2411 return tgt_check_lookup_req(req, 0, NULL);