Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Lustre Unified Target
32  * These are common function to work with last_received file
33  *
34  * Author: Mikhail Pershin <mike.pershin@intel.com>
35  */
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_fid.h>
39
40 #include "tgt_internal.h"
41
42 /** version recovery epoch */
43 #define LR_EPOCH_BITS   32
44
45 /* Allocate a bitmap for a chunk of reply data slots */
46 static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
47 {
48         unsigned long *bm;
49
50         OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
51                         sizeof(long));
52         if (bm == NULL)
53                 return -ENOMEM;
54
55         spin_lock(&lut->lut_client_bitmap_lock);
56
57         if (lut->lut_reply_bitmap[chunk] != NULL) {
58                 /* someone else already allocated the bitmap for this chunk */
59                 spin_unlock(&lut->lut_client_bitmap_lock);
60                 OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
61                          sizeof(long));
62                 return 0;
63         }
64
65         lut->lut_reply_bitmap[chunk] = bm;
66
67         spin_unlock(&lut->lut_client_bitmap_lock);
68
69         return 0;
70 }
71
72 /* Look for an available reply data slot in the bitmap
73  * of the target @lut
74  * Allocate bitmap chunk when first used
75  * XXX algo could be improved if this routine limits performance
76  */
77 static int tgt_find_free_reply_slot(struct lu_target *lut)
78 {
79         unsigned long *bmp;
80         int chunk = 0;
81         int rc;
82         int b;
83
84         for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
85                 /* allocate the bitmap chunk if necessary */
86                 if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
87                         rc = tgt_bitmap_chunk_alloc(lut, chunk);
88                         if (rc != 0)
89                                 return rc;
90                 }
91                 bmp = lut->lut_reply_bitmap[chunk];
92
93                 /* look for an available slot in this chunk */
94                 do {
95                         b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
96                         if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
97                                 break;
98
99                         /* found one */
100                         if (test_and_set_bit(b, bmp) == 0)
101                                 return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
102                 } while (true);
103         }
104
105         return -ENOSPC;
106 }
107
108 /* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
109  * of the target @lut
110  * Allocate the bitmap chunk if necessary
111  */
112 static int tgt_set_reply_slot(struct lu_target *lut, int idx)
113 {
114         int chunk;
115         int b;
116         int rc;
117
118         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
119         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
120
121         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
122         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
123
124         /* allocate the bitmap chunk if necessary */
125         if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
126                 rc = tgt_bitmap_chunk_alloc(lut, chunk);
127                 if (rc != 0)
128                         return rc;
129         }
130
131         /* mark the slot 'used' in this chunk */
132         if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
133                 CERROR("%s: slot %d already set in bitmap\n",
134                        tgt_name(lut), idx);
135                 return -EALREADY;
136         }
137
138         return 0;
139 }
140
141
142 /* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
143  * of the target @lut
144  */
145 static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
146 {
147         int chunk;
148         int b;
149
150         if (lut->lut_obd->obd_stopping)
151                 /*
152                  * in case of failover keep the bit set in order to
153                  * avoid overwriting slots in reply_data which might
154                  * be required by resent rpcs
155                  */
156                 return 0;
157         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
158         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
159
160         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
161         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
162
163         if (lut->lut_reply_bitmap[chunk] == NULL) {
164                 CERROR("%s: slot %d not allocated\n",
165                        tgt_name(lut), idx);
166                 return -ENOENT;
167         }
168
169         if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
170                 CERROR("%s: slot %d already clear in bitmap\n",
171                        tgt_name(lut), idx);
172                 return -EALREADY;
173         }
174
175         return 0;
176 }
177
178
179 /* Read header of reply_data file of target @tgt into structure @lrh */
180 static int tgt_reply_header_read(const struct lu_env *env,
181                                  struct lu_target *tgt,
182                                  struct lsd_reply_header *lrh)
183 {
184         int                      rc;
185         struct lsd_reply_header  buf;
186         struct tgt_thread_info  *tti = tgt_th_info(env);
187
188         tti->tti_off = 0;
189         tti->tti_buf.lb_buf = &buf;
190         tti->tti_buf.lb_len = sizeof(buf);
191
192         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
193                             &tti->tti_off);
194         if (rc != 0)
195                 return rc;
196
197         lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
198         lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
199         lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
200
201         CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
202                "header_size=%d reply_size=%d\n",
203                 tgt->lut_obd->obd_name, REPLY_DATA,
204                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
205
206         return 0;
207 }
208
209 /* Write header into replay_data file of target @tgt from structure @lrh */
210 static int tgt_reply_header_write(const struct lu_env *env,
211                                   struct lu_target *tgt,
212                                   struct lsd_reply_header *lrh)
213 {
214         int                      rc;
215         struct lsd_reply_header  buf;
216         struct tgt_thread_info  *tti = tgt_th_info(env);
217         struct thandle          *th;
218         struct dt_object        *dto;
219
220         CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
221                "header_size=%d reply_size=%d\n",
222                 tgt->lut_obd->obd_name, REPLY_DATA,
223                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
224
225         if (tgt->lut_bottom->dd_rdonly)
226                 RETURN(0);
227
228         buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
229         buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
230         buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
231
232         th = dt_trans_create(env, tgt->lut_bottom);
233         if (IS_ERR(th))
234                 return PTR_ERR(th);
235         th->th_sync = 1;
236
237         tti->tti_off = 0;
238         tti->tti_buf.lb_buf = &buf;
239         tti->tti_buf.lb_len = sizeof(buf);
240
241         rc = dt_declare_record_write(env, tgt->lut_reply_data,
242                                      &tti->tti_buf, tti->tti_off, th);
243         if (rc)
244                 GOTO(out, rc);
245
246         rc = dt_trans_start(env, tgt->lut_bottom, th);
247         if (rc)
248                 GOTO(out, rc);
249
250         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
251         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
252 out:
253         dt_trans_stop(env, tgt->lut_bottom, th);
254         return rc;
255 }
256
257 /* Write the reply data @lrd into reply_data file of target @tgt
258  * at offset @off
259  */
260 static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
261                                 struct lsd_reply_data *lrd, loff_t off,
262                                 struct thandle *th)
263 {
264         struct tgt_thread_info *tti = tgt_th_info(env);
265         struct lsd_reply_data *buf = &tti->tti_lrd;
266         struct lsd_reply_header *lrh = &tgt->lut_reply_header;
267         struct dt_object *dto;
268
269         lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
270
271         buf->lrd_transno         = cpu_to_le64(lrd->lrd_transno);
272         buf->lrd_xid             = cpu_to_le64(lrd->lrd_xid);
273         buf->lrd_data            = cpu_to_le64(lrd->lrd_data);
274         buf->lrd_result          = cpu_to_le32(lrd->lrd_result);
275         buf->lrd_client_gen      = cpu_to_le32(lrd->lrd_client_gen);
276
277         lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
278
279         if (lrh->lrh_magic > LRH_MAGIC_V1)
280                 buf->lrd_batch_idx = cpu_to_le32(lrd->lrd_batch_idx);
281
282         tti->tti_off = off;
283         tti->tti_buf.lb_buf = buf;
284         tti->tti_buf.lb_len = lrh->lrh_reply_size;
285
286         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
287         return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
288 }
289
290 /* Read the reply data from reply_data file of target @tgt at offset @off
291  * into structure @lrd
292  */
293 static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
294                                struct lsd_reply_data *lrd, loff_t off,
295                                struct lsd_reply_header *lrh)
296 {
297         struct tgt_thread_info *tti = tgt_th_info(env);
298         struct lsd_reply_data *buf = &tti->tti_lrd;
299         int rc;
300
301         tti->tti_off = off;
302         tti->tti_buf.lb_buf = buf;
303         tti->tti_buf.lb_len = lrh->lrh_reply_size;
304
305         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
306                             &tti->tti_off);
307         if (rc != 0)
308                 return rc;
309
310         lrd->lrd_transno = le64_to_cpu(buf->lrd_transno);
311         lrd->lrd_xid = le64_to_cpu(buf->lrd_xid);
312         lrd->lrd_data = le64_to_cpu(buf->lrd_data);
313         lrd->lrd_result = le32_to_cpu(buf->lrd_result);
314         lrd->lrd_client_gen = le32_to_cpu(buf->lrd_client_gen);
315
316         if (lrh->lrh_magic > LRH_MAGIC_V1)
317                 lrd->lrd_batch_idx = le32_to_cpu(buf->lrd_batch_idx);
318         else
319                 lrd->lrd_batch_idx = 0;
320
321         return 0;
322 }
323
324 /* Free the in-memory reply data structure @trd and release
325  * the corresponding slot in the reply_data file of target @lut
326  * Called with ted_lcd_lock held
327  */
328 static void tgt_free_reply_data(struct lu_target *lut,
329                                 struct tg_export_data *ted,
330                                 struct tg_reply_data *trd)
331 {
332         CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
333                "client gen %u, slot idx %d\n",
334                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
335                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
336                trd->trd_index);
337
338         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
339
340         list_del(&trd->trd_list);
341         ted->ted_reply_cnt--;
342         if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
343                 tgt_clear_reply_slot(lut, trd->trd_index);
344         OBD_FREE_PTR(trd);
345 }
346
347 /* Release the reply data @trd from target @lut
348  * The reply data with the highest transno for this export
349  * is retained to ensure correctness of target recovery
350  * Called with ted_lcd_lock held
351  */
352 static void tgt_release_reply_data(struct lu_target *lut,
353                                    struct tg_export_data *ted,
354                                    struct tg_reply_data *trd)
355 {
356         CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
357                "client gen %u, slot idx %d\n",
358                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
359                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
360                trd->trd_index);
361
362         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
363
364         /* Do not free the reply data corresponding to the
365          * highest transno of this export.
366          * This ensures on-disk reply data is kept and
367          * last committed transno can be restored from disk in case
368          * of target recovery
369          */
370         if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
371                 /* free previous retained reply */
372                 if (ted->ted_reply_last != NULL)
373                         tgt_free_reply_data(lut, ted, ted->ted_reply_last);
374                 /* retain the reply */
375                 list_del_init(&trd->trd_list);
376                 ted->ted_reply_last = trd;
377         } else {
378                 tgt_free_reply_data(lut, ted, trd);
379         }
380 }
381
382 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
383 {
384         tti->tti_buf.lb_buf = &tti->tti_lsd;
385         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
386         return &tti->tti_buf;
387 }
388
389 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
390 {
391         tti->tti_buf.lb_buf = &tti->tti_lcd;
392         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
393         return &tti->tti_buf;
394 }
395
396 static inline bool tgt_is_multimodrpcs_record(struct lu_target *tgt,
397                                               struct lsd_client_data *lcd)
398 {
399         return tgt->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
400                 lcd->lcd_generation != 0;
401 }
402
403 /**
404  * Allocate in-memory data for client slot related to export.
405  */
406 int tgt_client_alloc(struct obd_export *exp)
407 {
408         ENTRY;
409         LASSERT(exp != exp->exp_obd->obd_self_export);
410
411         spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
412         INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
413         spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
414         INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
415
416         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
417         if (exp->exp_target_data.ted_lcd == NULL)
418                 RETURN(-ENOMEM);
419         /* Mark that slot is not yet valid, 0 doesn't work here */
420         exp->exp_target_data.ted_lr_idx = -1;
421         INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list);
422         mutex_init(&exp->exp_target_data.ted_lcd_lock);
423         RETURN(0);
424 }
425 EXPORT_SYMBOL(tgt_client_alloc);
426
427 /**
428  * Free in-memory data for client slot related to export.
429  */
430 void tgt_client_free(struct obd_export *exp)
431 {
432         struct tg_export_data   *ted = &exp->exp_target_data;
433         struct lu_target        *lut = class_exp2tgt(exp);
434         struct tg_reply_data    *trd, *tmp;
435
436         LASSERT(exp != exp->exp_obd->obd_self_export);
437
438         tgt_fmd_cleanup(exp);
439
440         /* free reply data */
441         mutex_lock(&ted->ted_lcd_lock);
442         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
443                 tgt_release_reply_data(lut, ted, trd);
444         }
445         if (ted->ted_reply_last != NULL) {
446                 tgt_free_reply_data(lut, ted, ted->ted_reply_last);
447                 ted->ted_reply_last = NULL;
448         }
449         mutex_unlock(&ted->ted_lcd_lock);
450
451         if (!hlist_unhashed(&exp->exp_gen_hash))
452                 cfs_hash_del(exp->exp_obd->obd_gen_hash,
453                              &ted->ted_lcd->lcd_generation,
454                              &exp->exp_gen_hash);
455
456         OBD_FREE_PTR(ted->ted_lcd);
457         ted->ted_lcd = NULL;
458
459         /* Target may have been freed (see LU-7430)
460          * Slot may be not yet assigned */
461         if (((struct obd_device_target *)(&exp->exp_obd->u))->obt_magic !=
462             OBT_MAGIC ||
463             ted->ted_lr_idx < 0)
464                 return;
465
466         /* Clear bit when lcd is freed */
467         LASSERT(lut && lut->lut_client_bitmap);
468         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
469                 CERROR("%s: client %u bit already clear in bitmap\n",
470                        exp->exp_obd->obd_name, ted->ted_lr_idx);
471                 LBUG();
472         }
473 }
474 EXPORT_SYMBOL(tgt_client_free);
475
476 static inline void tgt_check_lcd(const char *obd_name, int index,
477                                  struct lsd_client_data *lcd)
478 {
479         size_t uuid_size = sizeof(lcd->lcd_uuid);
480
481         if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
482                 lcd->lcd_uuid[uuid_size - 1] = '\0';
483
484                 LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
485                                lcd->lcd_uuid, obd_name, index);
486         }
487 }
488
489 static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
490                                 struct lsd_client_data *lcd,
491                                 loff_t *off, int index)
492 {
493         struct tgt_thread_info  *tti = tgt_th_info(env);
494         int                      rc;
495
496         tti_buf_lcd(tti);
497         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
498         if (rc == 0) {
499                 tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
500                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
501                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
502                 lcd->lcd_last_close_result =
503                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
504         }
505
506         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = %llu"
507                ", last_xid = %llu, last_result = %u, last_data = %u, "
508                "last_close_transno = %llu, last_close_xid = %llu, "
509                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
510                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
511                lcd->lcd_last_result, lcd->lcd_last_data,
512                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
513                lcd->lcd_last_close_result, rc);
514         return rc;
515 }
516
517 static int tgt_client_data_write(const struct lu_env *env,
518                                  struct lu_target *tgt,
519                                  struct lsd_client_data *lcd,
520                                  loff_t *off, struct thandle *th)
521 {
522         struct tgt_thread_info *tti = tgt_th_info(env);
523         struct dt_object        *dto;
524
525         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
526         lcd->lcd_last_close_result =
527                 ptlrpc_status_hton(lcd->lcd_last_close_result);
528         lcd_cpu_to_le(lcd, &tti->tti_lcd);
529         tti_buf_lcd(tti);
530
531         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
532         return dt_record_write(env, dto, &tti->tti_buf, off, th);
533 }
534
535 struct tgt_new_client_callback {
536         struct dt_txn_commit_cb  lncc_cb;
537         struct obd_export       *lncc_exp;
538 };
539
540 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
541                               struct dt_txn_commit_cb *cb, int err)
542 {
543         struct tgt_new_client_callback *ccb;
544
545         ccb = container_of(cb, struct tgt_new_client_callback, lncc_cb);
546
547         LASSERT(ccb->lncc_exp->exp_obd);
548
549         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
550                ccb->lncc_exp->exp_obd->obd_name,
551                ccb->lncc_exp->exp_client_uuid.uuid);
552
553         spin_lock(&ccb->lncc_exp->exp_lock);
554
555         ccb->lncc_exp->exp_need_sync = 0;
556
557         spin_unlock(&ccb->lncc_exp->exp_lock);
558         class_export_cb_put(ccb->lncc_exp);
559
560         OBD_FREE_PTR(ccb);
561 }
562
563 static int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
564 {
565         struct tgt_new_client_callback *ccb;
566         struct dt_txn_commit_cb *dcb;
567         int rc;
568
569         OBD_ALLOC_PTR(ccb);
570         if (ccb == NULL)
571                 return -ENOMEM;
572
573         ccb->lncc_exp = class_export_cb_get(exp);
574
575         dcb = &ccb->lncc_cb;
576         dcb->dcb_func = tgt_cb_new_client;
577         INIT_LIST_HEAD(&dcb->dcb_linkage);
578         strscpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
579
580         rc = dt_trans_cb_add(th, dcb);
581         if (rc) {
582                 class_export_cb_put(exp);
583                 OBD_FREE_PTR(ccb);
584         }
585         return rc;
586 }
587
588 /**
589  * Update client data in last_rcvd
590  */
591 static int tgt_client_data_update(const struct lu_env *env,
592                                   struct obd_export *exp)
593 {
594         struct tg_export_data   *ted = &exp->exp_target_data;
595         struct lu_target        *tgt = class_exp2tgt(exp);
596         struct tgt_thread_info  *tti = tgt_th_info(env);
597         struct thandle          *th;
598         int                      rc = 0;
599
600         ENTRY;
601
602         if (unlikely(tgt == NULL)) {
603                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
604                           class_exp2obd(exp)->obd_name);
605                 RETURN(-EINVAL);
606         }
607
608         if (tgt->lut_bottom->dd_rdonly)
609                 RETURN(0);
610
611         th = dt_trans_create(env, tgt->lut_bottom);
612         if (IS_ERR(th))
613                 RETURN(PTR_ERR(th));
614
615         tti_buf_lcd(tti);
616         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
617                                      &tti->tti_buf,
618                                      ted->ted_lr_off, th);
619         if (rc)
620                 GOTO(out, rc);
621
622         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
623         if (rc)
624                 GOTO(out, rc);
625
626         mutex_lock(&ted->ted_lcd_lock);
627
628         /*
629          * Until this operations will be committed the sync is needed
630          * for this export. This should be done _after_ starting the
631          * transaction so that many connecting clients will not bring
632          * server down with lots of sync writes.
633          */
634         rc = tgt_new_client_cb_add(th, exp);
635         if (rc) {
636                 /* can't add callback, do sync now */
637                 th->th_sync = 1;
638         } else {
639                 spin_lock(&exp->exp_lock);
640                 exp->exp_need_sync = 1;
641                 spin_unlock(&exp->exp_lock);
642         }
643
644         tti->tti_off = ted->ted_lr_off;
645         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
646
647         mutex_unlock(&ted->ted_lcd_lock);
648
649         EXIT;
650 out:
651         dt_trans_stop(env, tgt->lut_bottom, th);
652         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
653                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
654                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
655
656         return rc;
657 }
658
659 static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
660 {
661         struct tgt_thread_info  *tti = tgt_th_info(env);
662         int                      rc;
663
664         tti->tti_off = 0;
665         tti_buf_lsd(tti);
666         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
667                             &tti->tti_off);
668         if (rc == 0)
669                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
670
671         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
672                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
673                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
674         return rc;
675 }
676
677 static int tgt_server_data_write(const struct lu_env *env,
678                                  struct lu_target *tgt, struct thandle *th)
679 {
680         struct tgt_thread_info  *tti = tgt_th_info(env);
681         struct dt_object        *dto;
682         int                      rc;
683
684         ENTRY;
685
686         tti->tti_off = 0;
687         tti_buf_lsd(tti);
688         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
689
690         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
691         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
692
693         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
694                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
695                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
696
697         RETURN(rc);
698 }
699
700 /**
701  * Update server data in last_rcvd
702  */
703 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
704                            int sync)
705 {
706         struct tgt_thread_info  *tti = tgt_th_info(env);
707         struct thandle          *th;
708         int                      rc = 0;
709
710         ENTRY;
711
712         CDEBUG(D_SUPER,
713                "%s: mount_count is %llu, last_transno is %llu\n",
714                tgt->lut_lsd.lsd_uuid, obd2obt(tgt->lut_obd)->obt_mount_count,
715                tgt->lut_last_transno);
716
717         /* Always save latest transno to keep it fresh */
718         spin_lock(&tgt->lut_translock);
719         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
720         spin_unlock(&tgt->lut_translock);
721
722         if (tgt->lut_bottom->dd_rdonly)
723                 RETURN(0);
724
725         th = dt_trans_create(env, tgt->lut_bottom);
726         if (IS_ERR(th))
727                 RETURN(PTR_ERR(th));
728
729         th->th_sync = sync;
730
731         tti_buf_lsd(tti);
732         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
733                                      &tti->tti_buf, tti->tti_off, th);
734         if (rc)
735                 GOTO(out, rc);
736
737         rc = dt_trans_start(env, tgt->lut_bottom, th);
738         if (rc)
739                 GOTO(out, rc);
740
741         rc = tgt_server_data_write(env, tgt, th);
742 out:
743         dt_trans_stop(env, tgt->lut_bottom, th);
744
745         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
746                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
747                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
748         RETURN(rc);
749 }
750 EXPORT_SYMBOL(tgt_server_data_update);
751
752 static int tgt_truncate_object(const struct lu_env *env, struct lu_target *tgt,
753                                struct dt_object *dt, loff_t size)
754 {
755         struct thandle   *th;
756         struct lu_attr    attr;
757         int               rc;
758
759         ENTRY;
760
761         if (tgt->lut_bottom->dd_rdonly)
762                 RETURN(0);
763
764         attr.la_size = size;
765         attr.la_valid = LA_SIZE;
766
767         th = dt_trans_create(env, tgt->lut_bottom);
768         if (IS_ERR(th))
769                 RETURN(PTR_ERR(th));
770         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
771         if (rc)
772                 GOTO(cleanup, rc);
773         rc = dt_declare_attr_set(env, dt, &attr, th);
774         if (rc)
775                 GOTO(cleanup, rc);
776         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
777         if (rc)
778                 GOTO(cleanup, rc);
779
780         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
781         if (rc == 0)
782                 rc = dt_attr_set(env, dt, &attr, th);
783
784 cleanup:
785         dt_trans_stop(env, tgt->lut_bottom, th);
786
787         RETURN(rc);
788 }
789
790 static void tgt_client_epoch_update(const struct lu_env *env,
791                                     struct obd_export *exp)
792 {
793         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
794         struct lu_target        *tgt = class_exp2tgt(exp);
795
796         LASSERT(tgt && tgt->lut_bottom);
797         /** VBR: set client last_epoch to current epoch */
798         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
799                 return;
800         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
801         tgt_client_data_update(env, exp);
802 }
803
804 static int tgt_reply_data_upgrade_check(const struct lu_env *env,
805                                         struct lu_target *tgt)
806 {
807         struct lsd_reply_header *lrh = &tgt->lut_reply_header;
808         int rc;
809
810         /*
811          * Reply data is supported by MDT targets only for now.
812          * When reply data object @lut_reply_data is NULL, it indicates the
813          * target type is OST and it should skip the upgrade check.
814          */
815         if (tgt->lut_reply_data == NULL)
816                 RETURN(0);
817
818         rc = tgt_reply_header_read(env, tgt, lrh);
819         if (rc) {
820                 CERROR("%s: failed to read %s: rc = %d\n",
821                        tgt_name(tgt), REPLY_DATA, rc);
822                 RETURN(rc);
823         }
824
825         if (lrh->lrh_magic == LRH_MAGIC)
826                 RETURN(0);
827
828         rc = tgt_truncate_object(env, tgt, tgt->lut_reply_data, 0);
829         if (rc) {
830                 CERROR("%s: failed to truncate %s: rc = %d\n",
831                        tgt_name(tgt), REPLY_DATA, rc);
832                 RETURN(rc);
833         }
834
835         lrh->lrh_magic = LRH_MAGIC;
836         lrh->lrh_header_size = sizeof(struct lsd_reply_header);
837         if (lrh->lrh_magic == LRH_MAGIC_V1)
838                 lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v1);
839         else
840                 lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v2);
841
842         rc = tgt_reply_header_write(env, tgt, lrh);
843         if (rc)
844                 CERROR("%s: failed to write header for %s: rc = %d\n",
845                        tgt_name(tgt), REPLY_DATA, rc);
846
847         RETURN(rc);
848 }
849
850 /**
851  * Update boot epoch when recovery ends
852  */
853 void tgt_boot_epoch_update(struct lu_target *tgt)
854 {
855         struct lu_env            env;
856         struct ptlrpc_request   *req;
857         __u32                    start_epoch;
858         LIST_HEAD(client_list);
859         int                      rc;
860
861         if (tgt->lut_obd->obd_stopping)
862                 return;
863
864         rc = lu_env_init(&env, LCT_DT_THREAD);
865         if (rc) {
866                 CERROR("%s: can't initialize environment: rc = %d\n",
867                         tgt->lut_obd->obd_name, rc);
868                 return;
869         }
870
871         spin_lock(&tgt->lut_translock);
872         start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
873         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
874         tgt->lut_lsd.lsd_start_epoch = start_epoch;
875         spin_unlock(&tgt->lut_translock);
876
877         /**
878          * The recovery is not yet finished and final queue can still be updated
879          * with resend requests. Move final list to separate one for processing
880          */
881         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
882         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
883         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
884
885         /**
886          * go through list of exports participated in recovery and
887          * set new epoch for them
888          */
889         list_for_each_entry(req, &client_list, rq_list) {
890                 LASSERT(!req->rq_export->exp_delayed);
891                 if (!req->rq_export->exp_vbr_failed)
892                         tgt_client_epoch_update(&env, req->rq_export);
893         }
894         /** return list back at once */
895         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
896         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
897         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
898
899         /**
900          * Clear MULTI RPCS incompatibility flag if there is no multi-rpcs
901          * client in last_rcvd file
902          */
903         if (atomic_read(&tgt->lut_num_clients) == 0)
904                 tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
905
906         /** update server epoch */
907         tgt_server_data_update(&env, tgt, 1);
908         tgt_reply_data_upgrade_check(&env, tgt);
909         lu_env_fini(&env);
910 }
911
912 /**
913  * commit callback, need to update last_committed value
914  */
915 struct tgt_last_committed_callback {
916         struct dt_txn_commit_cb  llcc_cb;
917         struct lu_target        *llcc_tgt;
918         struct obd_export       *llcc_exp;
919         __u64                    llcc_transno;
920 };
921
922 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
923                                   struct dt_txn_commit_cb *cb, int err)
924 {
925         struct tgt_last_committed_callback *ccb;
926
927         ccb = container_of(cb, struct tgt_last_committed_callback, llcc_cb);
928
929         LASSERT(ccb->llcc_exp);
930         LASSERT(ccb->llcc_tgt != NULL);
931         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
932
933         if (th->th_reserved_quota.lqi_space > 0) {
934                 CDEBUG(D_QUOTA, "free quota %llu %llu\n",
935                        th->th_reserved_quota.lqi_id.qid_gid,
936                        th->th_reserved_quota.lqi_space);
937
938                 /* env can be NULL for freeing reserved quota */
939                 th->th_reserved_quota.lqi_space *= -1;
940                 dt_reserve_or_free_quota(NULL, th->th_dev,
941                                          &th->th_reserved_quota);
942         }
943
944         /* error hit, don't update last committed to provide chance to
945          * replay data after fail */
946         if (err != 0)
947                 goto out;
948
949         /* Fast path w/o spinlock, if exp_last_committed was updated
950          * with higher transno, no need to take spinlock and check,
951          * also no need to update obd_last_committed. */
952         if (ccb->llcc_transno <= ccb->llcc_exp->exp_last_committed)
953                 goto out;
954         spin_lock(&ccb->llcc_tgt->lut_translock);
955         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
956                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
957
958         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
959                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
960                 spin_unlock(&ccb->llcc_tgt->lut_translock);
961
962                 ptlrpc_commit_replies(ccb->llcc_exp);
963                 tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
964         } else {
965                 spin_unlock(&ccb->llcc_tgt->lut_translock);
966         }
967
968         CDEBUG(D_HA, "%s: transno %lld is committed\n",
969                ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
970
971 out:
972         class_export_cb_put(ccb->llcc_exp);
973         OBD_FREE_PTR(ccb);
974 }
975
976 /**
977  * Add commit callback function, it returns a non-zero value to inform
978  * caller to use sync transaction if necessary.
979  */
980 static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
981                                   struct obd_export *exp, __u64 transno)
982 {
983         struct tgt_last_committed_callback      *ccb;
984         struct dt_txn_commit_cb                 *dcb;
985         int                                      rc;
986
987         OBD_ALLOC_PTR(ccb);
988         if (ccb == NULL)
989                 return -ENOMEM;
990
991         ccb->llcc_tgt = tgt;
992         ccb->llcc_exp = class_export_cb_get(exp);
993         ccb->llcc_transno = transno;
994
995         dcb = &ccb->llcc_cb;
996         dcb->dcb_func = tgt_cb_last_committed;
997         INIT_LIST_HEAD(&dcb->dcb_linkage);
998         strscpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
999
1000         rc = dt_trans_cb_add(th, dcb);
1001         if (rc) {
1002                 class_export_cb_put(exp);
1003                 OBD_FREE_PTR(ccb);
1004         }
1005
1006         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1007                 /* report failure to force synchronous operation */
1008                 return -EPERM;
1009
1010         /* if exp_need_sync is set, return non-zero value to force
1011          * a sync transaction. */
1012         return rc ? rc : exp->exp_need_sync;
1013 }
1014
1015 static int tgt_is_local_client(const struct lu_env *env,
1016                                       struct obd_export *exp)
1017 {
1018         struct lu_target        *tgt = class_exp2tgt(exp);
1019         struct tgt_session_info *tsi = tgt_ses_info(env);
1020         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1021
1022         if (exp_connect_flags(exp) & OBD_CONNECT_MDS ||
1023             exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1024                 return 0;
1025         if (tgt->lut_local_recovery)
1026                 return 0;
1027         if (!req)
1028                 return 0;
1029         if (!LNetIsPeerLocal(&req->rq_peer.nid))
1030                 return 0;
1031
1032         return 1;
1033 }
1034
1035 /**
1036  * Add new client to the last_rcvd upon new connection.
1037  *
1038  * We use a bitmap to locate a free space in the last_rcvd file and initialize
1039  * tg_export_data.
1040  */
1041 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
1042 {
1043         struct tg_export_data   *ted = &exp->exp_target_data;
1044         struct lu_target        *tgt = class_exp2tgt(exp);
1045         int                      rc = 0, idx;
1046
1047         ENTRY;
1048
1049         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1050         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
1051                 RETURN(0);
1052
1053         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1054                 RETURN(0);
1055
1056         if (tgt_is_local_client(env, exp)) {
1057                 LCONSOLE_WARN("%s: local client %s w/o recovery\n",
1058                               exp->exp_obd->obd_name, ted->ted_lcd->lcd_uuid);
1059                 exp->exp_no_recovery = 1;
1060                 RETURN(0);
1061         }
1062
1063         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
1064          * there's no need for extra complication here
1065          */
1066         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
1067 repeat:
1068         if (idx >= LR_MAX_CLIENTS ||
1069             CFS_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
1070                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
1071                        tgt->lut_obd->obd_name,  idx);
1072                 RETURN(-EOVERFLOW);
1073         }
1074         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1075                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
1076                                              LR_MAX_CLIENTS, idx);
1077                 goto repeat;
1078         }
1079
1080         ted->ted_lr_idx = idx;
1081         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1082                           idx * tgt->lut_lsd.lsd_client_size;
1083
1084         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1085
1086         if (tgt_is_multimodrpcs_client(exp)) {
1087                 /* Set MULTI RPCS incompatibility flag to prevent previous
1088                  * Lustre versions to mount a target with reply_data file */
1089                 if (!(tgt->lut_lsd.lsd_feature_incompat &
1090                       OBD_INCOMPAT_MULTI_RPCS)) {
1091                         tgt->lut_lsd.lsd_feature_incompat |=
1092                                                         OBD_INCOMPAT_MULTI_RPCS;
1093                         rc = tgt_server_data_update(env, tgt, 1);
1094                         if (rc < 0) {
1095                                 CERROR("%s: unable to set MULTI RPCS "
1096                                        "incompatibility flag\n",
1097                                        exp->exp_obd->obd_name);
1098                                 RETURN(rc);
1099                         }
1100                 }
1101
1102                 /* assign client slot generation */
1103                 ted->ted_lcd->lcd_generation =
1104                                 atomic_inc_return(&tgt->lut_client_generation);
1105         } else {
1106                 ted->ted_lcd->lcd_generation = 0;
1107         }
1108
1109         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' "
1110                "generation %d\n",
1111                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1112                ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
1113
1114         if (CFS_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
1115                 RETURN(-ENOSPC);
1116
1117         rc = tgt_client_data_update(env, exp);
1118         if (rc) {
1119                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
1120                        tgt->lut_obd->obd_name, idx, rc);
1121                 RETURN(rc);
1122         }
1123
1124         if (tgt_is_multimodrpcs_client(exp))
1125                 atomic_inc(&tgt->lut_num_clients);
1126
1127         RETURN(0);
1128 }
1129 EXPORT_SYMBOL(tgt_client_new);
1130
1131 /* Add an existing client to the MDS in-memory state based on
1132  * a client that was previously found in the last_rcvd file and
1133  * already has an assigned slot (idx >= 0).
1134  *
1135  * It should not be possible to fail adding an existing client - otherwise
1136  * mdt_init_server_data() callsite needs to be fixed.
1137  */
1138 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
1139 {
1140         struct tg_export_data   *ted = &exp->exp_target_data;
1141         struct lu_target        *tgt = class_exp2tgt(exp);
1142
1143         ENTRY;
1144
1145         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1146         LASSERTF(idx >= 0, "%d\n", idx);
1147
1148         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
1149             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1150                 RETURN(0);
1151
1152         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1153                 CERROR("%s: client %d: bit already set in bitmap!!\n",
1154                        tgt->lut_obd->obd_name,  idx);
1155                 LBUG();
1156         }
1157
1158         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
1159                "generation %d\n",
1160                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid,
1161                ted->ted_lcd->lcd_generation);
1162
1163         ted->ted_lr_idx = idx;
1164         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1165                           idx * tgt->lut_lsd.lsd_client_size;
1166
1167         mutex_init(&ted->ted_lcd_lock);
1168
1169         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1170
1171         RETURN(0);
1172 }
1173
1174 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
1175 {
1176         struct tg_export_data   *ted = &exp->exp_target_data;
1177         struct lu_target        *tgt = class_exp2tgt(exp);
1178         int                      rc;
1179
1180         ENTRY;
1181
1182         LASSERT(ted->ted_lcd);
1183
1184         if (unlikely(tgt == NULL)) {
1185                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
1186                        class_exp2obd(exp)->obd_name);
1187                 RETURN(-EINVAL);
1188         }
1189
1190         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
1191         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
1192                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
1193             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT ||
1194             exp->exp_no_recovery)
1195                 RETURN(0);
1196
1197         /* Slot may be not yet assigned, use case is race between Client
1198          * reconnect and forced eviction */
1199         if (ted->ted_lr_idx < 0) {
1200                 CWARN("%s: client with UUID '%s' not in bitmap\n",
1201                       tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid);
1202                 RETURN(0);
1203         }
1204
1205         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
1206                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1207                ted->ted_lcd->lcd_uuid);
1208
1209         /* Clear the bit _after_ zeroing out the client so we don't
1210            race with filter_client_add and zero out new clients.*/
1211         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
1212                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
1213                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
1214                 LBUG();
1215         }
1216
1217         /* Do not erase record for recoverable client. */
1218         if (exp->exp_flags & OBD_OPT_FAILOVER)
1219                 RETURN(0);
1220
1221         if (CFS_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
1222                 RETURN(0);
1223
1224         /* Make sure the server's last_transno is up to date.
1225          * This should be done before zeroing client slot so last_transno will
1226          * be in server data or in client data in case of failure */
1227         rc = tgt_server_data_update(env, tgt, 0);
1228         if (rc != 0) {
1229                 CERROR("%s: failed to update server data, skip client %s "
1230                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
1231                        ted->ted_lcd->lcd_uuid, rc);
1232                 RETURN(rc);
1233         }
1234
1235         /* Race between an eviction and a disconnection ?*/
1236         mutex_lock(&ted->ted_lcd_lock);
1237         if (ted->ted_lcd->lcd_uuid[0] == '\0') {
1238                 mutex_unlock(&ted->ted_lcd_lock);
1239                 RETURN(rc);
1240         }
1241
1242         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
1243         mutex_unlock(&ted->ted_lcd_lock);
1244
1245         rc = tgt_client_data_update(env, exp);
1246
1247         if (!rc && tgt_is_multimodrpcs_record(tgt, ted->ted_lcd))
1248                 atomic_dec(&tgt->lut_num_clients);
1249
1250         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
1251                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
1252                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
1253                ted->ted_lr_idx, ted->ted_lr_off, rc);
1254         RETURN(rc);
1255 }
1256 EXPORT_SYMBOL(tgt_client_del);
1257
1258 static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
1259 {
1260         struct tg_export_data   *ted = &exp->exp_target_data;
1261         struct lu_target        *lut = class_exp2tgt(exp);
1262         struct tg_reply_data    *trd, *tmp;
1263
1264         if (tag == 0)
1265                 return;
1266
1267         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
1268                 if (trd->trd_tag != tag)
1269                         continue;
1270
1271                 LASSERT(ergo(tgt_is_increasing_xid_client(exp),
1272                              trd->trd_reply.lrd_xid <= xid));
1273
1274                 ted->ted_release_tag++;
1275                 tgt_release_reply_data(lut, ted, trd);
1276         }
1277 }
1278
1279 static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
1280                        struct tg_export_data *ted, struct tg_reply_data *trd,
1281                        struct ptlrpc_request *req,
1282                        struct thandle *th, bool update_lrd_file)
1283 {
1284         struct tgt_session_info *tsi = NULL;
1285         struct lsd_reply_data *lrd;
1286         int i = -1;
1287         int rc;
1288
1289         lrd = &trd->trd_reply;
1290         /* update export last transno */
1291         mutex_lock(&ted->ted_lcd_lock);
1292         if (lrd->lrd_transno > ted->ted_lcd->lcd_last_transno)
1293                 ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
1294         mutex_unlock(&ted->ted_lcd_lock);
1295
1296         if (!tgt) {
1297                 trd->trd_index = TRD_INDEX_MEMORY;
1298                 GOTO(add_reply_data, rc = 0);
1299         }
1300
1301         if (env) {
1302                 tsi = tgt_ses_info(env);
1303                 if (tsi->tsi_batch_trd) {
1304                         LASSERT(tsi->tsi_batch_env);
1305                         trd = tsi->tsi_batch_trd;
1306                         i = trd->trd_index;
1307                 }
1308         }
1309
1310         if (i == -1) {
1311                 /* find a empty slot */
1312                 i = tgt_find_free_reply_slot(tgt);
1313                 if (unlikely(i < 0)) {
1314                         CERROR("%s: couldn't find a slot for reply data: rc = %d\n",
1315                                tgt_name(tgt), i);
1316                         RETURN(i);
1317                 }
1318                 trd->trd_index = i;
1319         }
1320
1321         if (update_lrd_file) {
1322                 struct lsd_reply_header *lrh = &tgt->lut_reply_header;
1323                 loff_t  off;
1324
1325                 /* write reply data to disk */
1326                 off = lrh->lrh_header_size + lrh->lrh_reply_size * i;
1327                 rc = tgt_reply_data_write(env, tgt, lrd, off, th);
1328                 if (unlikely(rc != 0)) {
1329                         CERROR("%s: can't update %s file: rc = %d\n",
1330                                tgt_name(tgt), REPLY_DATA, rc);
1331                         GOTO(free_slot, rc);
1332                 }
1333         }
1334
1335 add_reply_data:
1336         /* add reply data to target export's reply list */
1337         mutex_lock(&ted->ted_lcd_lock);
1338         if (req != NULL) {
1339                 int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
1340                               MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
1341
1342                 if (req->rq_obsolete) {
1343                         CDEBUG(D_INFO,
1344                                "drop reply data update for obsolete req xid=%llu,"
1345                                "transno=%llu, tag=%hu\n", req->rq_xid,
1346                                lrd->lrd_transno, trd->trd_tag);
1347                         mutex_unlock(&ted->ted_lcd_lock);
1348                         GOTO(free_slot, rc = -EBADR);
1349                 }
1350
1351                 if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude) &&
1352                     !(tsi && tsi->tsi_batch_env &&
1353                       trd->trd_reply.lrd_batch_idx > 0))
1354                         tgt_clean_by_tag(req->rq_export, req->rq_xid,
1355                                          trd->trd_tag);
1356         }
1357
1358         /*
1359          * For the batched RPC, all sub requests use one common @trd for the
1360          * reply data.
1361          */
1362         if (list_empty(&trd->trd_list)) {
1363                 list_add(&trd->trd_list, &ted->ted_reply_list);
1364                 ted->ted_reply_cnt++;
1365                 if (ted->ted_reply_cnt > ted->ted_reply_max)
1366                         ted->ted_reply_max = ted->ted_reply_cnt;
1367         }
1368         mutex_unlock(&ted->ted_lcd_lock);
1369
1370         CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
1371                "tag %hu, client gen %u, slot idx %d\n",
1372                trd, lrd->lrd_xid, lrd->lrd_transno,
1373                trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
1374
1375         RETURN(0);
1376
1377 free_slot:
1378         if (tgt != NULL)
1379                 tgt_clear_reply_slot(tgt, trd->trd_index);
1380         return rc;
1381 }
1382
1383 int tgt_mk_reply_data(const struct lu_env *env,
1384                       struct lu_target *tgt,
1385                       struct tg_export_data *ted,
1386                       struct ptlrpc_request *req,
1387                       __u64 opdata,
1388                       struct thandle *th,
1389                       bool write_update,
1390                       __u64 transno)
1391 {
1392         struct tg_reply_data *trd = NULL;
1393         struct lsd_reply_data *lrd;
1394         __u64 *pre_versions = NULL;
1395         struct tgt_session_info *tsi = NULL;
1396         int rc;
1397
1398         if (env != NULL) {
1399                 tsi = tgt_ses_info(env);
1400                 if (tsi->tsi_batch_trd) {
1401                         LASSERT(tsi->tsi_batch_env);
1402                         trd = tsi->tsi_batch_trd;
1403                 }
1404         }
1405
1406         if (trd == NULL) {
1407                 OBD_ALLOC_PTR(trd);
1408                 if (unlikely(trd == NULL))
1409                         RETURN(-ENOMEM);
1410
1411                 INIT_LIST_HEAD(&trd->trd_list);
1412         }
1413
1414         /* fill reply data information */
1415         lrd = &trd->trd_reply;
1416         lrd->lrd_transno = transno;
1417         if (tsi && tsi->tsi_batch_env) {
1418                 if (tsi->tsi_batch_idx == 0) {
1419                         LASSERT(req != NULL);
1420                         tsi->tsi_batch_trd = trd;
1421                         trd->trd_index = -1;
1422                         lrd->lrd_xid = req->rq_xid;
1423                         trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1424                         lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1425                 }
1426                 lrd->lrd_batch_idx = tsi->tsi_batch_idx;
1427         } else if (req != NULL) {
1428                 lrd->lrd_xid = req->rq_xid;
1429                 trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1430                 lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1431                 if (write_update) {
1432                         pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1433                         lrd->lrd_result = th->th_result;
1434                 }
1435         } else {
1436                 LASSERT(env != NULL);
1437                 LASSERT(tsi->tsi_xid != 0);
1438
1439                 lrd->lrd_xid = tsi->tsi_xid;
1440                 lrd->lrd_result = tsi->tsi_result;
1441                 lrd->lrd_client_gen = tsi->tsi_client_gen;
1442         }
1443
1444         lrd->lrd_data = opdata;
1445         if (pre_versions) {
1446                 trd->trd_pre_versions[0] = pre_versions[0];
1447                 trd->trd_pre_versions[1] = pre_versions[1];
1448                 trd->trd_pre_versions[2] = pre_versions[2];
1449                 trd->trd_pre_versions[3] = pre_versions[3];
1450         }
1451
1452         if (tsi && tsi->tsi_open_obj)
1453                 trd->trd_object = *lu_object_fid(&tsi->tsi_open_obj->do_lu);
1454
1455         rc = tgt_add_reply_data(env, tgt, ted, trd, req,
1456                                 th, write_update);
1457         if (rc < 0) {
1458                 OBD_FREE_PTR(trd);
1459                 if (rc == -EBADR)
1460                         rc = 0;
1461         }
1462         return rc;
1463
1464 }
1465 EXPORT_SYMBOL(tgt_mk_reply_data);
1466
1467 /*
1468  * last_rcvd & last_committed update callbacks
1469  */
1470 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
1471                                 struct dt_object *obj, __u64 opdata,
1472                                 struct thandle *th, struct ptlrpc_request *req)
1473 {
1474         struct tgt_thread_info  *tti = tgt_th_info(env);
1475         struct tgt_session_info *tsi = tgt_ses_info(env);
1476         struct obd_export *exp = tsi->tsi_exp;
1477         struct tg_export_data *ted;
1478         __u64 *transno_p;
1479         bool nolcd = false;
1480         int rc = 0;
1481
1482         ENTRY;
1483
1484
1485         LASSERT(exp != NULL);
1486         ted = &exp->exp_target_data;
1487
1488         /* Some clients don't support recovery, and they don't have last_rcvd
1489          * client data:
1490          * 1. lightweight clients.
1491          * 2. local clients on MDS which doesn't enable "localrecov".
1492          * 3. OFD connect may cause transaction before export has last_rcvd
1493          *    slot.
1494          */
1495         if (ted->ted_lr_idx < 0)
1496                 nolcd = true;
1497
1498         if (req != NULL)
1499                 tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1500         else
1501                 /* From update replay, tti_transno should be set already */
1502                 LASSERT(tti->tti_transno != 0);
1503
1504         spin_lock(&tgt->lut_translock);
1505         if (th->th_result != 0) {
1506                 if (tti->tti_transno != 0) {
1507                         CERROR("%s: replay transno %llu failed: rc = %d\n",
1508                                tgt_name(tgt), tti->tti_transno, th->th_result);
1509                 }
1510         } else if (tti->tti_transno == 0) {
1511                 tti->tti_transno = ++tgt->lut_last_transno;
1512         } else {
1513                 /* should be replay */
1514                 if (tti->tti_transno > tgt->lut_last_transno)
1515                         tgt->lut_last_transno = tti->tti_transno;
1516         }
1517         spin_unlock(&tgt->lut_translock);
1518
1519         /** VBR: set new versions */
1520         if (th->th_result == 0 && obj != NULL) {
1521                 struct dt_object *dto = dt_object_locate(obj, th->th_dev);
1522
1523                 dt_version_set(env, dto, tti->tti_transno, th);
1524                 if (unlikely(tsi->tsi_dv_update))
1525                         dt_data_version_set(env, dto, tti->tti_transno, th);
1526         }
1527
1528         /* filling reply data */
1529         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1530                tti->tti_transno, tgt->lut_obd->obd_last_committed);
1531
1532         if (req != NULL) {
1533                 req->rq_transno = tti->tti_transno;
1534                 lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
1535         }
1536
1537         /* if can't add callback, do sync write */
1538         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
1539
1540         if (nolcd) {
1541                 /* store transno in the last_rcvd header */
1542                 spin_lock(&tgt->lut_translock);
1543                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
1544                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
1545                         spin_unlock(&tgt->lut_translock);
1546                         /* Although current connection doesn't have slot
1547                          * in the last_rcvd, we still want to maintain
1548                          * the in-memory lsd_client_data structure in order to
1549                          * properly handle reply reconstruction. */
1550                         rc = tgt_server_data_write(env, tgt, th);
1551                 } else {
1552                         spin_unlock(&tgt->lut_translock);
1553                 }
1554         } else if (ted->ted_lr_off == 0) {
1555                 CERROR("%s: client idx %d has offset %lld\n",
1556                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
1557                 RETURN(-EINVAL);
1558         }
1559
1560         /* Target that supports multiple reply data */
1561         if (tgt_is_multimodrpcs_client(exp)) {
1562                 return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
1563                                          !!(req != NULL), tti->tti_transno);
1564         }
1565
1566         /* Enough for update replay, let's return */
1567         if (req == NULL)
1568                 RETURN(rc);
1569
1570         mutex_lock(&ted->ted_lcd_lock);
1571         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1572         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
1573                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
1574                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
1575                 ted->ted_lcd->lcd_last_close_result = th->th_result;
1576         } else {
1577                 /* VBR: save versions in last_rcvd for reconstruct. */
1578                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1579
1580                 if (pre_versions) {
1581                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
1582                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
1583                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
1584                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
1585                 }
1586                 transno_p = &ted->ted_lcd->lcd_last_transno;
1587                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
1588                 ted->ted_lcd->lcd_last_result = th->th_result;
1589                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
1590                  * see struct ldlm_reply->lock_policy_res1; */
1591                 ted->ted_lcd->lcd_last_data = opdata;
1592         }
1593
1594         /* Update transno in slot only if non-zero number, i.e. no errors */
1595         if (likely(tti->tti_transno != 0)) {
1596                 /* Don't overwrite bigger transaction number with lower one.
1597                  * That is not sign of problem in all cases, but in any case
1598                  * this value should be monotonically increased only. */
1599                 if (*transno_p > tti->tti_transno) {
1600                         if (!tgt->lut_no_reconstruct) {
1601                                 CERROR("%s: trying to overwrite bigger transno:"
1602                                        "on-disk: %llu, new: %llu replay: "
1603                                        "%d. See LU-617.\n", tgt_name(tgt),
1604                                        *transno_p, tti->tti_transno,
1605                                        req_is_replay(req));
1606                                 if (req_is_replay(req)) {
1607                                         spin_lock(&req->rq_export->exp_lock);
1608                                         req->rq_export->exp_vbr_failed = 1;
1609                                         spin_unlock(&req->rq_export->exp_lock);
1610                                 }
1611                                 mutex_unlock(&ted->ted_lcd_lock);
1612                                 RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
1613                         }
1614                 } else {
1615                         *transno_p = tti->tti_transno;
1616                 }
1617         }
1618
1619         if (!nolcd) {
1620                 tti->tti_off = ted->ted_lr_off;
1621                 if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
1622                         rc = -EIO;
1623                 else
1624                         rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
1625                                                    &tti->tti_off, th);
1626                 if (rc < 0) {
1627                         mutex_unlock(&ted->ted_lcd_lock);
1628                         RETURN(rc);
1629                 }
1630         }
1631         mutex_unlock(&ted->ted_lcd_lock);
1632         RETURN(rc);
1633 }
1634
1635 /*
1636  * last_rcvd update for echo client simulation.
1637  * It updates last_rcvd client slot and version of object in
1638  * simple way but with all locks to simulate all drawbacks
1639  */
1640 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
1641                                      struct lu_target *tgt,
1642                                      struct dt_object *obj,
1643                                      struct thandle *th,
1644                                      struct obd_export *exp)
1645 {
1646         struct tgt_thread_info  *tti = tgt_th_info(env);
1647         struct tg_export_data   *ted = &exp->exp_target_data;
1648         int                      rc = 0;
1649
1650         ENTRY;
1651
1652         tti->tti_transno = 0;
1653
1654         spin_lock(&tgt->lut_translock);
1655         if (th->th_result == 0)
1656                 tti->tti_transno = ++tgt->lut_last_transno;
1657         spin_unlock(&tgt->lut_translock);
1658
1659         /** VBR: set new versions */
1660         if (th->th_result == 0 && obj != NULL)
1661                 dt_version_set(env, obj, tti->tti_transno, th);
1662
1663         /* if can't add callback, do sync write */
1664         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
1665                                                 tti->tti_transno);
1666
1667         LASSERT(ted->ted_lr_off > 0);
1668
1669         mutex_lock(&ted->ted_lcd_lock);
1670         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1671         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
1672         ted->ted_lcd->lcd_last_result = th->th_result;
1673
1674         tti->tti_off = ted->ted_lr_off;
1675         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
1676         mutex_unlock(&ted->ted_lcd_lock);
1677         RETURN(rc);
1678 }
1679
1680 static int tgt_clients_data_init(const struct lu_env *env,
1681                                  struct lu_target *tgt,
1682                                  unsigned long last_size)
1683 {
1684         struct obd_device       *obd = tgt->lut_obd;
1685         struct lr_server_data   *lsd = &tgt->lut_lsd;
1686         struct lsd_client_data  *lcd = NULL;
1687         struct tg_export_data   *ted;
1688         int                      cl_idx;
1689         int                      rc = 0;
1690         loff_t                   off = lsd->lsd_client_start;
1691         __u32                    generation = 0;
1692         struct cfs_hash         *hash = NULL;
1693
1694         ENTRY;
1695
1696         if (tgt->lut_bottom->dd_rdonly)
1697                 RETURN(0);
1698
1699         BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
1700                      sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
1701
1702         OBD_ALLOC_PTR(lcd);
1703         if (lcd == NULL)
1704                 RETURN(-ENOMEM);
1705
1706         hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
1707         if (hash == NULL)
1708                 GOTO(err_out, rc = -ENODEV);
1709
1710         for (cl_idx = 0; off < last_size; cl_idx++) {
1711                 struct obd_export       *exp;
1712                 __u64                    last_transno;
1713
1714                 /* Don't assume off is incremented properly by
1715                  * read_record(), in case sizeof(*lcd)
1716                  * isn't the same as fsd->lsd_client_size.  */
1717                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
1718                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
1719                 if (rc) {
1720                         CERROR("%s: error reading last_rcvd %s idx %d off "
1721                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
1722                                cl_idx, off, rc);
1723                         rc = 0;
1724                         break; /* read error shouldn't cause startup to fail */
1725                 }
1726
1727                 if (lcd->lcd_uuid[0] == '\0') {
1728                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
1729                                cl_idx);
1730                         continue;
1731                 }
1732
1733                 last_transno = lcd_last_transno(lcd);
1734
1735                 /* These exports are cleaned up by disconnect, so they
1736                  * need to be set up like real exports as connect does.
1737                  */
1738                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
1739                        " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
1740                        cl_idx, last_transno, lsd->lsd_last_transno,
1741                        lcd_last_xid(lcd), lcd->lcd_generation);
1742
1743                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
1744                 if (IS_ERR(exp)) {
1745                         if (PTR_ERR(exp) == -EALREADY) {
1746                                 /* export already exists, zero out this one */
1747                                 CERROR("%s: Duplicate export %s!\n",
1748                                        tgt_name(tgt), lcd->lcd_uuid);
1749                                 continue;
1750                         }
1751                         GOTO(err_out, rc = PTR_ERR(exp));
1752                 }
1753
1754                 ted = &exp->exp_target_data;
1755                 *ted->ted_lcd = *lcd;
1756
1757                 rc = tgt_client_add(env, exp, cl_idx);
1758                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
1759                 /* VBR: set export last committed version */
1760                 exp->exp_last_committed = last_transno;
1761                 spin_lock(&exp->exp_lock);
1762                 exp->exp_connecting = 0;
1763                 exp->exp_in_recovery = 0;
1764                 spin_unlock(&exp->exp_lock);
1765                 atomic_inc(&obd->obd_max_recoverable_clients);
1766
1767                 if (tgt_is_multimodrpcs_record(tgt, lcd)) {
1768                         atomic_inc(&tgt->lut_num_clients);
1769
1770                         /* compute the highest valid client generation */
1771                         generation = max(generation, lcd->lcd_generation);
1772                         /* fill client_generation <-> export hash table */
1773                         rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
1774                                                  &exp->exp_gen_hash);
1775                         if (rc != 0) {
1776                                 CERROR("%s: duplicate export for client "
1777                                        "generation %u\n",
1778                                        tgt_name(tgt), lcd->lcd_generation);
1779                                 class_export_put(exp);
1780                                 GOTO(err_out, rc);
1781                         }
1782                 }
1783
1784                 class_export_put(exp);
1785
1786                 rc = rev_import_init(exp);
1787                 if (rc != 0) {
1788                         class_unlink_export(exp);
1789                         GOTO(err_out, rc);
1790                 }
1791
1792                 /* Need to check last_rcvd even for duplicated exports. */
1793                 CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
1794                        cl_idx, last_transno);
1795
1796                 spin_lock(&tgt->lut_translock);
1797                 tgt->lut_last_transno = max(last_transno,
1798                                             tgt->lut_last_transno);
1799                 spin_unlock(&tgt->lut_translock);
1800         }
1801
1802         /* record highest valid client generation */
1803         atomic_set(&tgt->lut_client_generation, generation);
1804
1805 err_out:
1806         if (hash != NULL)
1807                 cfs_hash_putref(hash);
1808         OBD_FREE_PTR(lcd);
1809         RETURN(rc);
1810 }
1811
1812 struct server_compat_data {
1813         __u32 rocompat;
1814         __u32 incompat;
1815         __u32 rocinit;
1816         __u32 incinit;
1817 };
1818
1819 static struct server_compat_data tgt_scd[] = {
1820         [LDD_F_SV_TYPE_MDT] = {
1821                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
1822                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1823                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
1824                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
1825                             OBD_INCOMPAT_MULTI_RPCS,
1826                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
1827                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1828                            OBD_INCOMPAT_MULTI_OI,
1829         },
1830         [LDD_F_SV_TYPE_OST] = {
1831                 .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
1832                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
1833                             OBD_INCOMPAT_FID,
1834                 .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
1835                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
1836         }
1837 };
1838
1839 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
1840 {
1841         struct tgt_thread_info          *tti = tgt_th_info(env);
1842         struct lr_server_data           *lsd = &tgt->lut_lsd;
1843         unsigned long                    last_rcvd_size;
1844         __u32                            index;
1845         int                              rc, type;
1846
1847         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
1848         if (rc)
1849                 RETURN(rc);
1850
1851         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1852
1853         /* ensure padding in the struct is the correct size */
1854         BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
1855                      sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
1856
1857         rc = server_name2index(tgt_name(tgt), &index, NULL);
1858         if (rc < 0) {
1859                 CERROR("%s: Can not get index from name: rc = %d\n",
1860                        tgt_name(tgt), rc);
1861                 RETURN(rc);
1862         }
1863         /* server_name2index() returns type */
1864         type = rc;
1865         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1866                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1867                 RETURN(-EINVAL);
1868         }
1869
1870         /* last_rcvd on OST doesn't provide reconstruct support because there
1871          * may be up to 8 in-flight write requests per single slot in
1872          * last_rcvd client data
1873          */
1874         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1875
1876         if (last_rcvd_size == 0) {
1877                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1878
1879                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1880                        sizeof(lsd->lsd_uuid));
1881                 lsd->lsd_last_transno = 0;
1882                 lsd->lsd_mount_count = 0;
1883                 lsd->lsd_server_size = LR_SERVER_SIZE;
1884                 lsd->lsd_client_start = LR_CLIENT_START;
1885                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1886                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1887                 lsd->lsd_osd_index = index;
1888                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1889                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1890         } else {
1891                 rc = tgt_server_data_read(env, tgt);
1892                 if (rc) {
1893                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1894                                tgt_name(tgt), rc);
1895                         RETURN(rc);
1896                 }
1897                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1898                         if (tgt->lut_bottom->dd_rdonly) {
1899                                 /* Such difference may be caused by mounting
1900                                  * up snapshot with new fsname under rd_only
1901                                  * mode. But even if it was NOT, it will not
1902                                  * damage the system because of "rd_only". */
1903                                 memcpy(lsd->lsd_uuid,
1904                                        tgt->lut_obd->obd_uuid.uuid,
1905                                        sizeof(lsd->lsd_uuid));
1906                         } else {
1907                                 LCONSOLE_ERROR_MSG(0x157, "Trying to start "
1908                                                    "OBD %s using the wrong "
1909                                                    "disk %s. Were the /dev/ "
1910                                                    "assignments rearranged?\n",
1911                                                    tgt->lut_obd->obd_uuid.uuid,
1912                                                    lsd->lsd_uuid);
1913                                 RETURN(-EINVAL);
1914                         }
1915                 }
1916
1917                 if (lsd->lsd_osd_index != index) {
1918                         LCONSOLE_ERROR_MSG(0x157,
1919                                            "%s: index %d in last rcvd is different with the index %d in config log, It might be disk corruption!\n",
1920                                            tgt_name(tgt),
1921                                            lsd->lsd_osd_index, index);
1922                         RETURN(-EINVAL);
1923                 }
1924         }
1925
1926         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1927                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1928                        tgt_name(tgt),
1929                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1930                 RETURN(-EINVAL);
1931         }
1932
1933         if (type == LDD_F_SV_TYPE_MDT)
1934                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1935
1936         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1937                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1938                        tgt_name(tgt),
1939                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1940                 RETURN(-EINVAL);
1941         }
1942         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1943         if (type == LDD_F_SV_TYPE_MDT &&
1944             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1945                 if (last_rcvd_size > lsd->lsd_client_start) {
1946                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1947                                       "remove all clients for interop needs\n",
1948                                       tgt_name(tgt));
1949                         rc = tgt_truncate_object(env, tgt, tgt->lut_last_rcvd,
1950                                                  lsd->lsd_client_start);
1951                         if (rc)
1952                                 RETURN(rc);
1953                         last_rcvd_size = lsd->lsd_client_start;
1954                 }
1955                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1956                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1957         }
1958
1959         spin_lock(&tgt->lut_translock);
1960         tgt->lut_last_transno = lsd->lsd_last_transno;
1961         spin_unlock(&tgt->lut_translock);
1962
1963         lsd->lsd_mount_count++;
1964
1965         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1966         CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
1967                tgt_name(tgt), tgt->lut_last_transno);
1968         CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
1969                tgt_name(tgt), lsd->lsd_mount_count);
1970         CDEBUG(D_INODE, "%s: server data size: %u\n",
1971                tgt_name(tgt), lsd->lsd_server_size);
1972         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1973                tgt_name(tgt), lsd->lsd_client_start);
1974         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1975                tgt_name(tgt), lsd->lsd_client_size);
1976         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1977                tgt_name(tgt), last_rcvd_size);
1978         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1979                tgt_name(tgt), lsd->lsd_subdir_count);
1980         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1981                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1982                (last_rcvd_size - lsd->lsd_client_start) /
1983                 lsd->lsd_client_size);
1984         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1985
1986         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1987             lsd->lsd_client_size == 0) {
1988                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1989                 RETURN(-EINVAL);
1990         }
1991
1992         if (!tgt->lut_obd->obd_replayable)
1993                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1994
1995         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1996         if (rc < 0)
1997                 GOTO(err_client, rc);
1998
1999         spin_lock(&tgt->lut_translock);
2000         /* obd_last_committed is used for compatibility
2001          * with other lustre recovery code */
2002         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
2003         spin_unlock(&tgt->lut_translock);
2004
2005         obd2obt(tgt->lut_obd)->obt_mount_count = lsd->lsd_mount_count;
2006         obd2obt(tgt->lut_obd)->obt_instance = (__u32)lsd->lsd_mount_count;
2007
2008         /* save it, so mount count and last_transno is current */
2009         rc = tgt_server_data_update(env, tgt, 0);
2010         if (rc < 0)
2011                 GOTO(err_client, rc);
2012
2013         RETURN(0);
2014
2015 err_client:
2016         class_disconnect_exports(tgt->lut_obd);
2017         return rc;
2018 }
2019
2020 /* add credits for last_rcvd update */
2021 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
2022                      void *cookie)
2023 {
2024         struct lu_target        *tgt = cookie;
2025         struct tgt_session_info *tsi;
2026         struct tgt_thread_info  *tti = tgt_th_info(env);
2027         struct dt_object        *dto;
2028         int                      rc;
2029
2030         /* For readonly case, the caller should have got failure
2031          * when start the transaction. If the logic comes here,
2032          * there must be something wrong. */
2033         if (unlikely(tgt->lut_bottom->dd_rdonly)) {
2034                 dump_stack();
2035                 LBUG();
2036         }
2037
2038         /* if there is no session, then this transaction is not result of
2039          * request processing but some local operation */
2040         if (env->le_ses == NULL)
2041                 return 0;
2042
2043         LASSERT(tgt->lut_last_rcvd);
2044         tsi = tgt_ses_info(env);
2045         /* OFD may start transaction without export assigned */
2046         if (tsi->tsi_exp == NULL)
2047                 return 0;
2048
2049         if (tgt_is_multimodrpcs_client(tsi->tsi_exp)) {
2050                 /*
2051                  * Use maximum possible file offset for declaration to ensure
2052                  * ZFS will reserve enough credits for a write anywhere in this
2053                  * file, since we don't know where in the file the write will be
2054                  * because a replay slot has not been assigned.  This should be
2055                  * replaced by dmu_tx_hold_append() when available.
2056                  */
2057                 tti->tti_buf.lb_buf = NULL;
2058                 tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
2059                 dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
2060                 rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
2061                 if (rc)
2062                         return rc;
2063         } else {
2064                 dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
2065                 tti_buf_lcd(tti);
2066                 tti->tti_off = tsi->tsi_exp->exp_target_data.ted_lr_off;
2067                 rc = dt_declare_record_write(env, dto, &tti->tti_buf,
2068                                              tti->tti_off, th);
2069                 if (rc)
2070                         return rc;
2071         }
2072
2073         if (tsi->tsi_vbr_obj != NULL &&
2074             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2075                 dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
2076                 rc = dt_declare_version_set(env, dto, th);
2077                 if (!rc && tsi->tsi_dv_update)
2078                         rc = dt_declare_data_version_set(env, dto, th);
2079         }
2080
2081         return rc;
2082 }
2083
2084 /* Update last_rcvd records with latests transaction data */
2085 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
2086                     void *cookie)
2087 {
2088         struct lu_target        *tgt = cookie;
2089         struct tgt_session_info *tsi;
2090         struct tgt_thread_info  *tti = tgt_th_info(env);
2091         struct dt_object        *obj = NULL;
2092         int                      rc;
2093         bool                     echo_client;
2094
2095         if (env->le_ses == NULL)
2096                 return 0;
2097
2098         tsi = tgt_ses_info(env);
2099         /* OFD may start transaction without export assigned */
2100         if (tsi->tsi_exp == NULL)
2101                 return 0;
2102
2103         echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
2104
2105         if (tsi->tsi_has_trans && !echo_client && !tsi->tsi_batch_env) {
2106                 if (!tsi->tsi_mult_trans) {
2107                         CDEBUG(D_HA, "More than one transaction %llu\n",
2108                                tti->tti_transno);
2109                         /**
2110                          * if RPC handler sees unexpected multiple last_rcvd
2111                          * updates with transno, then it is better to return
2112                          * the latest transaction number to the client.
2113                          * In that case replay may fail if part of operation
2114                          * was committed and can't be re-applied easily. But
2115                          * that is better than report the first transno, in
2116                          * which case partially committed operation would be
2117                          * considered as finished so never replayed causing
2118                          * data loss.
2119                          */
2120                 }
2121                 /* we need new transno to be assigned */
2122                 tti->tti_transno = 0;
2123         }
2124
2125         if (!th->th_result)
2126                 tsi->tsi_has_trans++;
2127
2128         if (tsi->tsi_vbr_obj != NULL &&
2129             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2130                 obj = tsi->tsi_vbr_obj;
2131         }
2132
2133         if (unlikely(echo_client)) /* echo client special case */
2134                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
2135                                                tsi->tsi_exp);
2136         else
2137                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
2138                                           tgt_ses_req(tsi));
2139         return rc;
2140 }
2141
2142 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
2143 {
2144         struct tgt_thread_info  *tti = tgt_th_info(env);
2145         struct lsd_reply_data   *lrd = &tti->tti_lrd;
2146         unsigned long            reply_data_size;
2147         int                      rc;
2148         struct lsd_reply_header *lrh = &tgt->lut_reply_header;
2149         struct tg_reply_data    *trd = NULL;
2150         int                      idx;
2151         loff_t                   off;
2152         struct cfs_hash         *hash = NULL;
2153         struct obd_export       *exp;
2154         struct tg_export_data   *ted;
2155         int                      reply_data_recovered = 0;
2156
2157         rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr);
2158         if (rc)
2159                 GOTO(out, rc);
2160         reply_data_size = (unsigned long)tti->tti_attr.la_size;
2161
2162         if (reply_data_size == 0) {
2163                 CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
2164                        tgt_name(tgt));
2165                 lrh->lrh_magic = LRH_MAGIC;
2166                 lrh->lrh_header_size = sizeof(struct lsd_reply_header);
2167                 if (lrh->lrh_magic == LRH_MAGIC_V1)
2168                         lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v1);
2169                 else
2170                         lrh->lrh_reply_size = sizeof(struct lsd_reply_data_v2);
2171                 rc = tgt_reply_header_write(env, tgt, lrh);
2172                 if (rc) {
2173                         CERROR("%s: error writing %s: rc = %d\n",
2174                                tgt_name(tgt), REPLY_DATA, rc);
2175                         GOTO(out, rc);
2176                 }
2177         } else {
2178                 __u32 recsz = sizeof(*lrd);
2179                 const char *lrd_ver = "v2";
2180
2181                 rc = tgt_reply_header_read(env, tgt, lrh);
2182                 if (rc) {
2183                         CERROR("%s: error reading %s: rc = %d\n",
2184                                tgt_name(tgt), REPLY_DATA, rc);
2185                         GOTO(out, rc);
2186                 }
2187
2188                 switch (lrh->lrh_magic) {
2189 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 5, 53, 0)
2190                 /* The old reply_data is replaced on the first mount after
2191                  * an upgrade, so no need to keep this interop code forever.
2192                  */
2193                 case LRH_MAGIC_V1:
2194                         recsz = sizeof(struct lsd_reply_data_v1);
2195                         lrd_ver = "v1";
2196
2197                         if (lrh->lrh_magic != LRH_MAGIC)
2198                                 CWARN("%s: %s record size will be %s\n",
2199                                       tgt_name(tgt), REPLY_DATA,
2200                                       lrh->lrh_magic < LRH_MAGIC ?
2201                                       "upgraded" : "downgraded");
2202                         fallthrough;
2203 #endif
2204                 case LRH_MAGIC_V2:
2205                         if (lrh->lrh_header_size != sizeof(*lrh)) {
2206                                 CERROR("%s: bad %s %s header size: %u != %lu\n",
2207                                        tgt_name(tgt), REPLY_DATA, lrd_ver,
2208                                        lrh->lrh_header_size, sizeof(*lrh));
2209                                 GOTO(out, rc = -EINVAL);
2210                         }
2211                         if (lrh->lrh_reply_size != recsz) {
2212                                 CERROR("%s: bad %s %s reply size: %u != %u\n",
2213                                 tgt_name(tgt), REPLY_DATA, lrd_ver,
2214                                 lrh->lrh_reply_size, recsz);
2215                                 GOTO(out, rc = -EINVAL);
2216                         }
2217                         break;
2218                 default:
2219                         CERROR("%s: invalid %s magic: %x != %x/%x\n",
2220                                tgt_name(tgt), REPLY_DATA,
2221                                lrh->lrh_magic, LRH_MAGIC_V1, LRH_MAGIC_V2);
2222                         GOTO(out, rc = -EINVAL);
2223                 }
2224
2225                 hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
2226                 if (hash == NULL)
2227                         GOTO(out, rc = -ENODEV);
2228
2229                 OBD_ALLOC_PTR(trd);
2230                 if (trd == NULL)
2231                         GOTO(out, rc = -ENOMEM);
2232
2233                 /* Load reply_data from disk */
2234                 for (idx = 0, off = lrh->lrh_header_size;
2235                      off < reply_data_size; idx++, off += recsz) {
2236                         rc = tgt_reply_data_read(env, tgt, lrd, off, lrh);
2237                         if (rc) {
2238                                 CERROR("%s: error reading %s: rc = %d\n",
2239                                        tgt_name(tgt), REPLY_DATA, rc);
2240                                 GOTO(out, rc);
2241                         }
2242
2243                         exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
2244                         if (exp == NULL) {
2245                                 /* old reply data from a disconnected client */
2246                                 continue;
2247                         }
2248                         ted = &exp->exp_target_data;
2249                         mutex_lock(&ted->ted_lcd_lock);
2250
2251                         /* create in-memory reply_data and link it to
2252                          * target export's reply list */
2253                         rc = tgt_set_reply_slot(tgt, idx);
2254                         if (rc != 0) {
2255                                 mutex_unlock(&ted->ted_lcd_lock);
2256                                 GOTO(out, rc);
2257                         }
2258                         trd->trd_reply = *lrd;
2259                         trd->trd_pre_versions[0] = 0;
2260                         trd->trd_pre_versions[1] = 0;
2261                         trd->trd_pre_versions[2] = 0;
2262                         trd->trd_pre_versions[3] = 0;
2263                         trd->trd_index = idx;
2264                         trd->trd_tag = 0;
2265                         fid_zero(&trd->trd_object);
2266                         list_add(&trd->trd_list, &ted->ted_reply_list);
2267                         ted->ted_reply_cnt++;
2268                         if (ted->ted_reply_cnt > ted->ted_reply_max)
2269                                 ted->ted_reply_max = ted->ted_reply_cnt;
2270
2271                         CDEBUG(D_HA, "%s: restore reply %p: xid %llu, "
2272                                "transno %llu, client gen %u, slot idx %d\n",
2273                                tgt_name(tgt), trd, lrd->lrd_xid,
2274                                lrd->lrd_transno, lrd->lrd_client_gen,
2275                                trd->trd_index);
2276
2277                         /* update export last committed transation */
2278                         exp->exp_last_committed = max(exp->exp_last_committed,
2279                                                       lrd->lrd_transno);
2280                         /* Update lcd_last_transno as well for check in
2281                          * tgt_release_reply_data() or the latest client
2282                          * transno can be lost.
2283                          */
2284                         ted->ted_lcd->lcd_last_transno =
2285                                 max(ted->ted_lcd->lcd_last_transno,
2286                                     exp->exp_last_committed);
2287
2288                         mutex_unlock(&ted->ted_lcd_lock);
2289                         class_export_put(exp);
2290
2291                         /* update target last committed transaction */
2292                         spin_lock(&tgt->lut_translock);
2293                         tgt->lut_last_transno = max(tgt->lut_last_transno,
2294                                                     lrd->lrd_transno);
2295                         spin_unlock(&tgt->lut_translock);
2296
2297                         reply_data_recovered++;
2298
2299                         OBD_ALLOC_PTR(trd);
2300                         if (trd == NULL)
2301                                 GOTO(out, rc = -ENOMEM);
2302                 }
2303                 CDEBUG(D_INFO, "%s: %d reply data have been recovered\n",
2304                        tgt_name(tgt), reply_data_recovered);
2305         }
2306
2307         spin_lock(&tgt->lut_translock);
2308         /* obd_last_committed is used for compatibility
2309          * with other lustre recovery code */
2310         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
2311         spin_unlock(&tgt->lut_translock);
2312
2313         rc = 0;
2314
2315 out:
2316         if (hash != NULL)
2317                 cfs_hash_putref(hash);
2318         if (trd != NULL)
2319                 OBD_FREE_PTR(trd);
2320         return rc;
2321 }
2322
2323 static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
2324                                 struct tg_reply_data *trd)
2325 {
2326         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2327         struct lu_target *lut = class_exp2tgt(req->rq_export);
2328         __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
2329         int rc = 0;
2330         struct tg_reply_data *reply;
2331         bool check_increasing;
2332
2333         if (tag == 0)
2334                 return 0;
2335
2336         check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
2337                            !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2338         if (!lookup && !check_increasing)
2339                 return 0;
2340
2341         list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
2342                 if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
2343                         rc = 1;
2344                         if (trd != NULL)
2345                                 *trd = *reply;
2346                         break;
2347                 } else if (check_increasing && reply->trd_tag == tag &&
2348                            reply->trd_reply.lrd_xid > req->rq_xid) {
2349                         rc = -EPROTO;
2350                         CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
2351                                tgt_name(lut), tag, req->rq_xid, trd,
2352                                reply->trd_reply.lrd_xid,
2353                                reply->trd_reply.lrd_transno,
2354                                reply->trd_reply.lrd_client_gen,
2355                                reply->trd_index, rc);
2356                         break;
2357                 }
2358         }
2359
2360         return rc;
2361 }
2362
2363 /* Look for a reply data matching specified request @req
2364  * A copy is returned in @trd if the pointer is not NULL
2365  */
2366 int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
2367 {
2368         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2369         int found = 0;
2370         bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2371
2372         mutex_lock(&ted->ted_lcd_lock);
2373         if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
2374                 /* A check for the last_xid is needed here in case there is
2375                  * no reply data is left in the list. It may happen if another
2376                  * RPC on another slot increased the last_xid between our
2377                  * process_req_last_xid & tgt_lookup_reply calls */
2378                 found = -EPROTO;
2379         } else {
2380                 found = tgt_check_lookup_req(req, 1, trd);
2381         }
2382         mutex_unlock(&ted->ted_lcd_lock);
2383
2384         CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
2385                tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
2386                req->rq_export->exp_last_xid);
2387
2388         return found;
2389 }
2390 EXPORT_SYMBOL(tgt_lookup_reply);
2391
2392 int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
2393 {
2394         struct tg_export_data   *ted = &exp->exp_target_data;
2395         struct lu_target        *lut = class_exp2tgt(exp);
2396         struct tg_reply_data    *trd, *tmp;
2397
2398
2399         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
2400                 if (trd->trd_reply.lrd_xid > rcvd_xid)
2401                         continue;
2402                 ted->ted_release_xid++;
2403                 tgt_release_reply_data(lut, ted, trd);
2404         }
2405
2406         return 0;
2407 }
2408
2409 int tgt_handle_tag(struct ptlrpc_request *req)
2410 {
2411         return tgt_check_lookup_req(req, 0, NULL);
2412 }
2413