Whamcloud - gitweb
LU-12610 target: remove OBD_ -> CFS_ macros
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Lustre Unified Target
32  * These are common function to work with last_received file
33  *
34  * Author: Mikhail Pershin <mike.pershin@intel.com>
35  */
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_fid.h>
39
40 #include "tgt_internal.h"
41
42 /** version recovery epoch */
43 #define LR_EPOCH_BITS   32
44
45 /* Allocate a bitmap for a chunk of reply data slots */
46 static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
47 {
48         unsigned long *bm;
49
50         OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
51                         sizeof(long));
52         if (bm == NULL)
53                 return -ENOMEM;
54
55         spin_lock(&lut->lut_client_bitmap_lock);
56
57         if (lut->lut_reply_bitmap[chunk] != NULL) {
58                 /* someone else already allocated the bitmap for this chunk */
59                 spin_unlock(&lut->lut_client_bitmap_lock);
60                 OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
61                          sizeof(long));
62                 return 0;
63         }
64
65         lut->lut_reply_bitmap[chunk] = bm;
66
67         spin_unlock(&lut->lut_client_bitmap_lock);
68
69         return 0;
70 }
71
72 /* Look for an available reply data slot in the bitmap
73  * of the target @lut
74  * Allocate bitmap chunk when first used
75  * XXX algo could be improved if this routine limits performance
76  */
77 static int tgt_find_free_reply_slot(struct lu_target *lut)
78 {
79         unsigned long *bmp;
80         int chunk = 0;
81         int rc;
82         int b;
83
84         for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
85                 /* allocate the bitmap chunk if necessary */
86                 if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
87                         rc = tgt_bitmap_chunk_alloc(lut, chunk);
88                         if (rc != 0)
89                                 return rc;
90                 }
91                 bmp = lut->lut_reply_bitmap[chunk];
92
93                 /* look for an available slot in this chunk */
94                 do {
95                         b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
96                         if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
97                                 break;
98
99                         /* found one */
100                         if (test_and_set_bit(b, bmp) == 0)
101                                 return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
102                 } while (true);
103         }
104
105         return -ENOSPC;
106 }
107
108 /* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
109  * of the target @lut
110  * Allocate the bitmap chunk if necessary
111  */
112 static int tgt_set_reply_slot(struct lu_target *lut, int idx)
113 {
114         int chunk;
115         int b;
116         int rc;
117
118         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
119         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
120
121         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
122         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
123
124         /* allocate the bitmap chunk if necessary */
125         if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
126                 rc = tgt_bitmap_chunk_alloc(lut, chunk);
127                 if (rc != 0)
128                         return rc;
129         }
130
131         /* mark the slot 'used' in this chunk */
132         if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
133                 CERROR("%s: slot %d already set in bitmap\n",
134                        tgt_name(lut), idx);
135                 return -EALREADY;
136         }
137
138         return 0;
139 }
140
141
142 /* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
143  * of the target @lut
144  */
145 static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
146 {
147         int chunk;
148         int b;
149
150         if (lut->lut_obd->obd_stopping)
151                 /*
152                  * in case of failover keep the bit set in order to
153                  * avoid overwriting slots in reply_data which might
154                  * be required by resent rpcs
155                  */
156                 return 0;
157         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
158         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
159
160         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
161         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
162
163         if (lut->lut_reply_bitmap[chunk] == NULL) {
164                 CERROR("%s: slot %d not allocated\n",
165                        tgt_name(lut), idx);
166                 return -ENOENT;
167         }
168
169         if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
170                 CERROR("%s: slot %d already clear in bitmap\n",
171                        tgt_name(lut), idx);
172                 return -EALREADY;
173         }
174
175         return 0;
176 }
177
178
179 /* Read header of reply_data file of target @tgt into structure @lrh */
180 static int tgt_reply_header_read(const struct lu_env *env,
181                                  struct lu_target *tgt,
182                                  struct lsd_reply_header *lrh)
183 {
184         int                      rc;
185         struct lsd_reply_header  buf;
186         struct tgt_thread_info  *tti = tgt_th_info(env);
187
188         tti->tti_off = 0;
189         tti->tti_buf.lb_buf = &buf;
190         tti->tti_buf.lb_len = sizeof(buf);
191
192         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
193                             &tti->tti_off);
194         if (rc != 0)
195                 return rc;
196
197         lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
198         lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
199         lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
200
201         CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
202                "header_size=%d reply_size=%d\n",
203                 tgt->lut_obd->obd_name, REPLY_DATA,
204                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
205
206         return 0;
207 }
208
209 /* Write header into replay_data file of target @tgt from structure @lrh */
210 static int tgt_reply_header_write(const struct lu_env *env,
211                                   struct lu_target *tgt,
212                                   struct lsd_reply_header *lrh)
213 {
214         int                      rc;
215         struct lsd_reply_header  buf;
216         struct tgt_thread_info  *tti = tgt_th_info(env);
217         struct thandle          *th;
218         struct dt_object        *dto;
219
220         CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
221                "header_size=%d reply_size=%d\n",
222                 tgt->lut_obd->obd_name, REPLY_DATA,
223                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
224
225         if (tgt->lut_bottom->dd_rdonly)
226                 RETURN(0);
227
228         buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
229         buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
230         buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
231
232         th = dt_trans_create(env, tgt->lut_bottom);
233         if (IS_ERR(th))
234                 return PTR_ERR(th);
235         th->th_sync = 1;
236
237         tti->tti_off = 0;
238         tti->tti_buf.lb_buf = &buf;
239         tti->tti_buf.lb_len = sizeof(buf);
240
241         rc = dt_declare_record_write(env, tgt->lut_reply_data,
242                                      &tti->tti_buf, tti->tti_off, th);
243         if (rc)
244                 GOTO(out, rc);
245
246         rc = dt_trans_start(env, tgt->lut_bottom, th);
247         if (rc)
248                 GOTO(out, rc);
249
250         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
251         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
252 out:
253         dt_trans_stop(env, tgt->lut_bottom, th);
254         return rc;
255 }
256
257 /* Write the reply data @lrd into reply_data file of target @tgt
258  * at offset @off
259  */
260 static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
261                                 struct lsd_reply_data *lrd, loff_t off,
262                                 struct thandle *th)
263 {
264         struct tgt_thread_info  *tti = tgt_th_info(env);
265         struct dt_object        *dto;
266         struct lsd_reply_data   *buf = &tti->tti_lrd;
267
268         lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
269
270         buf->lrd_transno         = cpu_to_le64(lrd->lrd_transno);
271         buf->lrd_xid             = cpu_to_le64(lrd->lrd_xid);
272         buf->lrd_data            = cpu_to_le64(lrd->lrd_data);
273         buf->lrd_result          = cpu_to_le32(lrd->lrd_result);
274         buf->lrd_client_gen      = cpu_to_le32(lrd->lrd_client_gen);
275
276         lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
277
278         tti->tti_off = off;
279         tti->tti_buf.lb_buf = buf;
280         tti->tti_buf.lb_len = sizeof(*buf);
281
282         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
283         return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
284 }
285
286 /* Read the reply data from reply_data file of target @tgt at offset @off
287  * into structure @lrd
288  */
289 static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
290                                struct lsd_reply_data *lrd, loff_t off,
291                                __u32 magic)
292 {
293         struct tgt_thread_info *tti = tgt_th_info(env);
294         struct lsd_reply_data *buf = &tti->tti_lrd;
295         int rc;
296
297         tti->tti_off = off;
298         tti->tti_buf.lb_buf = buf;
299
300         if (magic == LRH_MAGIC)
301                 tti->tti_buf.lb_len = sizeof(*buf);
302         else if (magic == LRH_MAGIC_V1)
303                 tti->tti_buf.lb_len = sizeof(struct lsd_reply_data_v1);
304         else
305                 return -EINVAL;
306
307         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
308                             &tti->tti_off);
309         if (rc != 0)
310                 return rc;
311
312         lrd->lrd_transno = le64_to_cpu(buf->lrd_transno);
313         lrd->lrd_xid = le64_to_cpu(buf->lrd_xid);
314         lrd->lrd_data = le64_to_cpu(buf->lrd_data);
315         lrd->lrd_result = le32_to_cpu(buf->lrd_result);
316         lrd->lrd_client_gen = le32_to_cpu(buf->lrd_client_gen);
317
318         if (magic == LRH_MAGIC)
319                 lrd->lrd_batch_idx = le32_to_cpu(buf->lrd_batch_idx);
320         else
321                 lrd->lrd_batch_idx = 0;
322
323         return 0;
324 }
325
326 /* Free the in-memory reply data structure @trd and release
327  * the corresponding slot in the reply_data file of target @lut
328  * Called with ted_lcd_lock held
329  */
330 static void tgt_free_reply_data(struct lu_target *lut,
331                                 struct tg_export_data *ted,
332                                 struct tg_reply_data *trd)
333 {
334         CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
335                "client gen %u, slot idx %d\n",
336                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
337                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
338                trd->trd_index);
339
340         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
341
342         list_del(&trd->trd_list);
343         ted->ted_reply_cnt--;
344         if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
345                 tgt_clear_reply_slot(lut, trd->trd_index);
346         OBD_FREE_PTR(trd);
347 }
348
349 /* Release the reply data @trd from target @lut
350  * The reply data with the highest transno for this export
351  * is retained to ensure correctness of target recovery
352  * Called with ted_lcd_lock held
353  */
354 static void tgt_release_reply_data(struct lu_target *lut,
355                                    struct tg_export_data *ted,
356                                    struct tg_reply_data *trd)
357 {
358         CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
359                "client gen %u, slot idx %d\n",
360                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
361                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
362                trd->trd_index);
363
364         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
365
366         /* Do not free the reply data corresponding to the
367          * highest transno of this export.
368          * This ensures on-disk reply data is kept and
369          * last committed transno can be restored from disk in case
370          * of target recovery
371          */
372         if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
373                 /* free previous retained reply */
374                 if (ted->ted_reply_last != NULL)
375                         tgt_free_reply_data(lut, ted, ted->ted_reply_last);
376                 /* retain the reply */
377                 list_del_init(&trd->trd_list);
378                 ted->ted_reply_last = trd;
379         } else {
380                 tgt_free_reply_data(lut, ted, trd);
381         }
382 }
383
384 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
385 {
386         tti->tti_buf.lb_buf = &tti->tti_lsd;
387         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
388         return &tti->tti_buf;
389 }
390
391 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
392 {
393         tti->tti_buf.lb_buf = &tti->tti_lcd;
394         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
395         return &tti->tti_buf;
396 }
397
398 static inline bool tgt_is_multimodrpcs_record(struct lu_target *tgt,
399                                               struct lsd_client_data *lcd)
400 {
401         return tgt->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
402                 lcd->lcd_generation != 0;
403 }
404
405 /**
406  * Allocate in-memory data for client slot related to export.
407  */
408 int tgt_client_alloc(struct obd_export *exp)
409 {
410         ENTRY;
411         LASSERT(exp != exp->exp_obd->obd_self_export);
412
413         spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
414         INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
415         spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
416         INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
417
418         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
419         if (exp->exp_target_data.ted_lcd == NULL)
420                 RETURN(-ENOMEM);
421         /* Mark that slot is not yet valid, 0 doesn't work here */
422         exp->exp_target_data.ted_lr_idx = -1;
423         INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list);
424         mutex_init(&exp->exp_target_data.ted_lcd_lock);
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(tgt_client_alloc);
428
429 /**
430  * Free in-memory data for client slot related to export.
431  */
432 void tgt_client_free(struct obd_export *exp)
433 {
434         struct tg_export_data   *ted = &exp->exp_target_data;
435         struct lu_target        *lut = class_exp2tgt(exp);
436         struct tg_reply_data    *trd, *tmp;
437
438         LASSERT(exp != exp->exp_obd->obd_self_export);
439
440         tgt_fmd_cleanup(exp);
441
442         /* free reply data */
443         mutex_lock(&ted->ted_lcd_lock);
444         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
445                 tgt_release_reply_data(lut, ted, trd);
446         }
447         if (ted->ted_reply_last != NULL) {
448                 tgt_free_reply_data(lut, ted, ted->ted_reply_last);
449                 ted->ted_reply_last = NULL;
450         }
451         mutex_unlock(&ted->ted_lcd_lock);
452
453         if (!hlist_unhashed(&exp->exp_gen_hash))
454                 cfs_hash_del(exp->exp_obd->obd_gen_hash,
455                              &ted->ted_lcd->lcd_generation,
456                              &exp->exp_gen_hash);
457
458         OBD_FREE_PTR(ted->ted_lcd);
459         ted->ted_lcd = NULL;
460
461         /* Target may have been freed (see LU-7430)
462          * Slot may be not yet assigned */
463         if (((struct obd_device_target *)(&exp->exp_obd->u))->obt_magic !=
464             OBT_MAGIC ||
465             ted->ted_lr_idx < 0)
466                 return;
467
468         /* Clear bit when lcd is freed */
469         LASSERT(lut && lut->lut_client_bitmap);
470         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
471                 CERROR("%s: client %u bit already clear in bitmap\n",
472                        exp->exp_obd->obd_name, ted->ted_lr_idx);
473                 LBUG();
474         }
475 }
476 EXPORT_SYMBOL(tgt_client_free);
477
478 static inline void tgt_check_lcd(const char *obd_name, int index,
479                                  struct lsd_client_data *lcd)
480 {
481         size_t uuid_size = sizeof(lcd->lcd_uuid);
482
483         if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
484                 lcd->lcd_uuid[uuid_size - 1] = '\0';
485
486                 LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
487                                lcd->lcd_uuid, obd_name, index);
488         }
489 }
490
491 static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
492                                 struct lsd_client_data *lcd,
493                                 loff_t *off, int index)
494 {
495         struct tgt_thread_info  *tti = tgt_th_info(env);
496         int                      rc;
497
498         tti_buf_lcd(tti);
499         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
500         if (rc == 0) {
501                 tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
502                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
503                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
504                 lcd->lcd_last_close_result =
505                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
506         }
507
508         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = %llu"
509                ", last_xid = %llu, last_result = %u, last_data = %u, "
510                "last_close_transno = %llu, last_close_xid = %llu, "
511                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
512                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
513                lcd->lcd_last_result, lcd->lcd_last_data,
514                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
515                lcd->lcd_last_close_result, rc);
516         return rc;
517 }
518
519 static int tgt_client_data_write(const struct lu_env *env,
520                                  struct lu_target *tgt,
521                                  struct lsd_client_data *lcd,
522                                  loff_t *off, struct thandle *th)
523 {
524         struct tgt_thread_info *tti = tgt_th_info(env);
525         struct dt_object        *dto;
526
527         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
528         lcd->lcd_last_close_result =
529                 ptlrpc_status_hton(lcd->lcd_last_close_result);
530         lcd_cpu_to_le(lcd, &tti->tti_lcd);
531         tti_buf_lcd(tti);
532
533         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
534         return dt_record_write(env, dto, &tti->tti_buf, off, th);
535 }
536
537 struct tgt_new_client_callback {
538         struct dt_txn_commit_cb  lncc_cb;
539         struct obd_export       *lncc_exp;
540 };
541
542 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
543                               struct dt_txn_commit_cb *cb, int err)
544 {
545         struct tgt_new_client_callback *ccb;
546
547         ccb = container_of(cb, struct tgt_new_client_callback, lncc_cb);
548
549         LASSERT(ccb->lncc_exp->exp_obd);
550
551         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
552                ccb->lncc_exp->exp_obd->obd_name,
553                ccb->lncc_exp->exp_client_uuid.uuid);
554
555         spin_lock(&ccb->lncc_exp->exp_lock);
556
557         ccb->lncc_exp->exp_need_sync = 0;
558
559         spin_unlock(&ccb->lncc_exp->exp_lock);
560         class_export_cb_put(ccb->lncc_exp);
561
562         OBD_FREE_PTR(ccb);
563 }
564
565 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
566 {
567         struct tgt_new_client_callback  *ccb;
568         struct dt_txn_commit_cb         *dcb;
569         int                              rc;
570
571         OBD_ALLOC_PTR(ccb);
572         if (ccb == NULL)
573                 return -ENOMEM;
574
575         ccb->lncc_exp = class_export_cb_get(exp);
576
577         dcb = &ccb->lncc_cb;
578         dcb->dcb_func = tgt_cb_new_client;
579         INIT_LIST_HEAD(&dcb->dcb_linkage);
580         strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
581
582         rc = dt_trans_cb_add(th, dcb);
583         if (rc) {
584                 class_export_cb_put(exp);
585                 OBD_FREE_PTR(ccb);
586         }
587         return rc;
588 }
589
590 /**
591  * Update client data in last_rcvd
592  */
593 static int tgt_client_data_update(const struct lu_env *env,
594                                   struct obd_export *exp)
595 {
596         struct tg_export_data   *ted = &exp->exp_target_data;
597         struct lu_target        *tgt = class_exp2tgt(exp);
598         struct tgt_thread_info  *tti = tgt_th_info(env);
599         struct thandle          *th;
600         int                      rc = 0;
601
602         ENTRY;
603
604         if (unlikely(tgt == NULL)) {
605                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
606                           class_exp2obd(exp)->obd_name);
607                 RETURN(-EINVAL);
608         }
609
610         if (tgt->lut_bottom->dd_rdonly)
611                 RETURN(0);
612
613         th = dt_trans_create(env, tgt->lut_bottom);
614         if (IS_ERR(th))
615                 RETURN(PTR_ERR(th));
616
617         tti_buf_lcd(tti);
618         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
619                                      &tti->tti_buf,
620                                      ted->ted_lr_off, th);
621         if (rc)
622                 GOTO(out, rc);
623
624         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
625         if (rc)
626                 GOTO(out, rc);
627
628         mutex_lock(&ted->ted_lcd_lock);
629
630         /*
631          * Until this operations will be committed the sync is needed
632          * for this export. This should be done _after_ starting the
633          * transaction so that many connecting clients will not bring
634          * server down with lots of sync writes.
635          */
636         rc = tgt_new_client_cb_add(th, exp);
637         if (rc) {
638                 /* can't add callback, do sync now */
639                 th->th_sync = 1;
640         } else {
641                 spin_lock(&exp->exp_lock);
642                 exp->exp_need_sync = 1;
643                 spin_unlock(&exp->exp_lock);
644         }
645
646         tti->tti_off = ted->ted_lr_off;
647         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
648
649         mutex_unlock(&ted->ted_lcd_lock);
650
651         EXIT;
652 out:
653         dt_trans_stop(env, tgt->lut_bottom, th);
654         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
655                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
656                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
657
658         return rc;
659 }
660
661 static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
662 {
663         struct tgt_thread_info  *tti = tgt_th_info(env);
664         int                      rc;
665
666         tti->tti_off = 0;
667         tti_buf_lsd(tti);
668         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
669                             &tti->tti_off);
670         if (rc == 0)
671                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
672
673         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
674                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
675                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
676         return rc;
677 }
678
679 static int tgt_server_data_write(const struct lu_env *env,
680                                  struct lu_target *tgt, struct thandle *th)
681 {
682         struct tgt_thread_info  *tti = tgt_th_info(env);
683         struct dt_object        *dto;
684         int                      rc;
685
686         ENTRY;
687
688         tti->tti_off = 0;
689         tti_buf_lsd(tti);
690         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
691
692         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
693         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
694
695         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
696                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
697                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
698
699         RETURN(rc);
700 }
701
702 /**
703  * Update server data in last_rcvd
704  */
705 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
706                            int sync)
707 {
708         struct tgt_thread_info  *tti = tgt_th_info(env);
709         struct thandle          *th;
710         int                      rc = 0;
711
712         ENTRY;
713
714         CDEBUG(D_SUPER,
715                "%s: mount_count is %llu, last_transno is %llu\n",
716                tgt->lut_lsd.lsd_uuid, obd2obt(tgt->lut_obd)->obt_mount_count,
717                tgt->lut_last_transno);
718
719         /* Always save latest transno to keep it fresh */
720         spin_lock(&tgt->lut_translock);
721         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
722         spin_unlock(&tgt->lut_translock);
723
724         if (tgt->lut_bottom->dd_rdonly)
725                 RETURN(0);
726
727         th = dt_trans_create(env, tgt->lut_bottom);
728         if (IS_ERR(th))
729                 RETURN(PTR_ERR(th));
730
731         th->th_sync = sync;
732
733         tti_buf_lsd(tti);
734         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
735                                      &tti->tti_buf, tti->tti_off, th);
736         if (rc)
737                 GOTO(out, rc);
738
739         rc = dt_trans_start(env, tgt->lut_bottom, th);
740         if (rc)
741                 GOTO(out, rc);
742
743         rc = tgt_server_data_write(env, tgt, th);
744 out:
745         dt_trans_stop(env, tgt->lut_bottom, th);
746
747         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
748                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
749                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
750         RETURN(rc);
751 }
752 EXPORT_SYMBOL(tgt_server_data_update);
753
754 static int tgt_truncate_object(const struct lu_env *env, struct lu_target *tgt,
755                                struct dt_object *dt, loff_t size)
756 {
757         struct thandle   *th;
758         struct lu_attr    attr;
759         int               rc;
760
761         ENTRY;
762
763         if (tgt->lut_bottom->dd_rdonly)
764                 RETURN(0);
765
766         attr.la_size = size;
767         attr.la_valid = LA_SIZE;
768
769         th = dt_trans_create(env, tgt->lut_bottom);
770         if (IS_ERR(th))
771                 RETURN(PTR_ERR(th));
772         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
773         if (rc)
774                 GOTO(cleanup, rc);
775         rc = dt_declare_attr_set(env, dt, &attr, th);
776         if (rc)
777                 GOTO(cleanup, rc);
778         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
779         if (rc)
780                 GOTO(cleanup, rc);
781
782         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
783         if (rc == 0)
784                 rc = dt_attr_set(env, dt, &attr, th);
785
786 cleanup:
787         dt_trans_stop(env, tgt->lut_bottom, th);
788
789         RETURN(rc);
790 }
791
792 static void tgt_client_epoch_update(const struct lu_env *env,
793                                     struct obd_export *exp)
794 {
795         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
796         struct lu_target        *tgt = class_exp2tgt(exp);
797
798         LASSERT(tgt && tgt->lut_bottom);
799         /** VBR: set client last_epoch to current epoch */
800         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
801                 return;
802         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
803         tgt_client_data_update(env, exp);
804 }
805
806 static int tgt_reply_data_upgrade_check(const struct lu_env *env,
807                                         struct lu_target *tgt)
808 {
809         struct lsd_reply_header lrh;
810         int rc;
811
812         /*
813          * Reply data is supported by MDT targets only for now.
814          * When reply data object @lut_reply_data is NULL, it indicates the
815          * target type is OST and it should skip the upgrade check.
816          */
817         if (tgt->lut_reply_data == NULL)
818                 RETURN(0);
819
820         rc = tgt_reply_header_read(env, tgt, &lrh);
821         if (rc) {
822                 CERROR("%s: failed to read %s: rc = %d\n",
823                        tgt_name(tgt), REPLY_DATA, rc);
824                 RETURN(rc);
825         }
826
827         if (lrh.lrh_magic == LRH_MAGIC)
828                 RETURN(0);
829
830         rc = tgt_truncate_object(env, tgt, tgt->lut_reply_data, 0);
831         if (rc) {
832                 CERROR("%s: failed to truncate %s: rc = %d\n",
833                        tgt_name(tgt), REPLY_DATA, rc);
834                 RETURN(rc);
835         }
836
837         lrh.lrh_magic = LRH_MAGIC;
838         lrh.lrh_header_size = sizeof(struct lsd_reply_header);
839         lrh.lrh_reply_size = sizeof(struct lsd_reply_data);
840         rc = tgt_reply_header_write(env, tgt, &lrh);
841         if (rc)
842                 CERROR("%s: failed to write header for %s: rc = %d\n",
843                        tgt_name(tgt), REPLY_DATA, rc);
844
845         RETURN(rc);
846 }
847
848 /**
849  * Update boot epoch when recovery ends
850  */
851 void tgt_boot_epoch_update(struct lu_target *tgt)
852 {
853         struct lu_env            env;
854         struct ptlrpc_request   *req;
855         __u32                    start_epoch;
856         LIST_HEAD(client_list);
857         int                      rc;
858
859         if (tgt->lut_obd->obd_stopping)
860                 return;
861
862         rc = lu_env_init(&env, LCT_DT_THREAD);
863         if (rc) {
864                 CERROR("%s: can't initialize environment: rc = %d\n",
865                         tgt->lut_obd->obd_name, rc);
866                 return;
867         }
868
869         spin_lock(&tgt->lut_translock);
870         start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
871         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
872         tgt->lut_lsd.lsd_start_epoch = start_epoch;
873         spin_unlock(&tgt->lut_translock);
874
875         /**
876          * The recovery is not yet finished and final queue can still be updated
877          * with resend requests. Move final list to separate one for processing
878          */
879         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
880         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
881         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
882
883         /**
884          * go through list of exports participated in recovery and
885          * set new epoch for them
886          */
887         list_for_each_entry(req, &client_list, rq_list) {
888                 LASSERT(!req->rq_export->exp_delayed);
889                 if (!req->rq_export->exp_vbr_failed)
890                         tgt_client_epoch_update(&env, req->rq_export);
891         }
892         /** return list back at once */
893         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
894         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
895         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
896
897         /**
898          * Clear MULTI RPCS incompatibility flag if there is no multi-rpcs
899          * client in last_rcvd file
900          */
901         if (atomic_read(&tgt->lut_num_clients) == 0)
902                 tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
903
904         /** update server epoch */
905         tgt_server_data_update(&env, tgt, 1);
906         tgt_reply_data_upgrade_check(&env, tgt);
907         lu_env_fini(&env);
908 }
909
910 /**
911  * commit callback, need to update last_committed value
912  */
913 struct tgt_last_committed_callback {
914         struct dt_txn_commit_cb  llcc_cb;
915         struct lu_target        *llcc_tgt;
916         struct obd_export       *llcc_exp;
917         __u64                    llcc_transno;
918 };
919
920 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
921                                   struct dt_txn_commit_cb *cb, int err)
922 {
923         struct tgt_last_committed_callback *ccb;
924
925         ccb = container_of(cb, struct tgt_last_committed_callback, llcc_cb);
926
927         LASSERT(ccb->llcc_exp);
928         LASSERT(ccb->llcc_tgt != NULL);
929         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
930
931         if (th->th_reserved_quota.lqi_space > 0) {
932                 CDEBUG(D_QUOTA, "free quota %llu %llu\n",
933                        th->th_reserved_quota.lqi_id.qid_gid,
934                        th->th_reserved_quota.lqi_space);
935
936                 /* env can be NULL for freeing reserved quota */
937                 th->th_reserved_quota.lqi_space *= -1;
938                 dt_reserve_or_free_quota(NULL, th->th_dev,
939                                          &th->th_reserved_quota);
940         }
941
942         /* error hit, don't update last committed to provide chance to
943          * replay data after fail */
944         if (err != 0)
945                 goto out;
946
947         /* Fast path w/o spinlock, if exp_last_committed was updated
948          * with higher transno, no need to take spinlock and check,
949          * also no need to update obd_last_committed. */
950         if (ccb->llcc_transno <= ccb->llcc_exp->exp_last_committed)
951                 goto out;
952         spin_lock(&ccb->llcc_tgt->lut_translock);
953         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
954                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
955
956         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
957                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
958                 spin_unlock(&ccb->llcc_tgt->lut_translock);
959
960                 ptlrpc_commit_replies(ccb->llcc_exp);
961                 tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
962         } else {
963                 spin_unlock(&ccb->llcc_tgt->lut_translock);
964         }
965
966         CDEBUG(D_HA, "%s: transno %lld is committed\n",
967                ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
968
969 out:
970         class_export_cb_put(ccb->llcc_exp);
971         OBD_FREE_PTR(ccb);
972 }
973
974 /**
975  * Add commit callback function, it returns a non-zero value to inform
976  * caller to use sync transaction if necessary.
977  */
978 static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
979                                   struct obd_export *exp, __u64 transno)
980 {
981         struct tgt_last_committed_callback      *ccb;
982         struct dt_txn_commit_cb                 *dcb;
983         int                                      rc;
984
985         OBD_ALLOC_PTR(ccb);
986         if (ccb == NULL)
987                 return -ENOMEM;
988
989         ccb->llcc_tgt = tgt;
990         ccb->llcc_exp = class_export_cb_get(exp);
991         ccb->llcc_transno = transno;
992
993         dcb = &ccb->llcc_cb;
994         dcb->dcb_func = tgt_cb_last_committed;
995         INIT_LIST_HEAD(&dcb->dcb_linkage);
996         strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
997
998         rc = dt_trans_cb_add(th, dcb);
999         if (rc) {
1000                 class_export_cb_put(exp);
1001                 OBD_FREE_PTR(ccb);
1002         }
1003
1004         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1005                 /* report failure to force synchronous operation */
1006                 return -EPERM;
1007
1008         /* if exp_need_sync is set, return non-zero value to force
1009          * a sync transaction. */
1010         return rc ? rc : exp->exp_need_sync;
1011 }
1012
1013 static int tgt_is_local_client(const struct lu_env *env,
1014                                       struct obd_export *exp)
1015 {
1016         struct lu_target        *tgt = class_exp2tgt(exp);
1017         struct tgt_session_info *tsi = tgt_ses_info(env);
1018         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1019
1020         if (exp_connect_flags(exp) & OBD_CONNECT_MDS ||
1021             exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
1022                 return 0;
1023         if (tgt->lut_local_recovery)
1024                 return 0;
1025         if (!req)
1026                 return 0;
1027         if (!LNetIsPeerLocal(&req->rq_peer.nid))
1028                 return 0;
1029
1030         return 1;
1031 }
1032
1033 /**
1034  * Add new client to the last_rcvd upon new connection.
1035  *
1036  * We use a bitmap to locate a free space in the last_rcvd file and initialize
1037  * tg_export_data.
1038  */
1039 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
1040 {
1041         struct tg_export_data   *ted = &exp->exp_target_data;
1042         struct lu_target        *tgt = class_exp2tgt(exp);
1043         int                      rc = 0, idx;
1044
1045         ENTRY;
1046
1047         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1048         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
1049                 RETURN(0);
1050
1051         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1052                 RETURN(0);
1053
1054         if (tgt_is_local_client(env, exp)) {
1055                 LCONSOLE_WARN("%s: local client %s w/o recovery\n",
1056                               exp->exp_obd->obd_name, ted->ted_lcd->lcd_uuid);
1057                 exp->exp_no_recovery = 1;
1058                 RETURN(0);
1059         }
1060
1061         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
1062          * there's no need for extra complication here
1063          */
1064         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
1065 repeat:
1066         if (idx >= LR_MAX_CLIENTS ||
1067             CFS_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
1068                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
1069                        tgt->lut_obd->obd_name,  idx);
1070                 RETURN(-EOVERFLOW);
1071         }
1072         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1073                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
1074                                              LR_MAX_CLIENTS, idx);
1075                 goto repeat;
1076         }
1077
1078         ted->ted_lr_idx = idx;
1079         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1080                           idx * tgt->lut_lsd.lsd_client_size;
1081
1082         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1083
1084         if (tgt_is_multimodrpcs_client(exp)) {
1085                 /* Set MULTI RPCS incompatibility flag to prevent previous
1086                  * Lustre versions to mount a target with reply_data file */
1087                 if (!(tgt->lut_lsd.lsd_feature_incompat &
1088                       OBD_INCOMPAT_MULTI_RPCS)) {
1089                         tgt->lut_lsd.lsd_feature_incompat |=
1090                                                         OBD_INCOMPAT_MULTI_RPCS;
1091                         rc = tgt_server_data_update(env, tgt, 1);
1092                         if (rc < 0) {
1093                                 CERROR("%s: unable to set MULTI RPCS "
1094                                        "incompatibility flag\n",
1095                                        exp->exp_obd->obd_name);
1096                                 RETURN(rc);
1097                         }
1098                 }
1099
1100                 /* assign client slot generation */
1101                 ted->ted_lcd->lcd_generation =
1102                                 atomic_inc_return(&tgt->lut_client_generation);
1103         } else {
1104                 ted->ted_lcd->lcd_generation = 0;
1105         }
1106
1107         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' "
1108                "generation %d\n",
1109                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1110                ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
1111
1112         if (CFS_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
1113                 RETURN(-ENOSPC);
1114
1115         rc = tgt_client_data_update(env, exp);
1116         if (rc) {
1117                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
1118                        tgt->lut_obd->obd_name, idx, rc);
1119                 RETURN(rc);
1120         }
1121
1122         if (tgt_is_multimodrpcs_client(exp))
1123                 atomic_inc(&tgt->lut_num_clients);
1124
1125         RETURN(0);
1126 }
1127 EXPORT_SYMBOL(tgt_client_new);
1128
1129 /* Add an existing client to the MDS in-memory state based on
1130  * a client that was previously found in the last_rcvd file and
1131  * already has an assigned slot (idx >= 0).
1132  *
1133  * It should not be possible to fail adding an existing client - otherwise
1134  * mdt_init_server_data() callsite needs to be fixed.
1135  */
1136 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
1137 {
1138         struct tg_export_data   *ted = &exp->exp_target_data;
1139         struct lu_target        *tgt = class_exp2tgt(exp);
1140
1141         ENTRY;
1142
1143         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1144         LASSERTF(idx >= 0, "%d\n", idx);
1145
1146         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
1147             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1148                 RETURN(0);
1149
1150         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1151                 CERROR("%s: client %d: bit already set in bitmap!!\n",
1152                        tgt->lut_obd->obd_name,  idx);
1153                 LBUG();
1154         }
1155
1156         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
1157                "generation %d\n",
1158                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid,
1159                ted->ted_lcd->lcd_generation);
1160
1161         ted->ted_lr_idx = idx;
1162         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1163                           idx * tgt->lut_lsd.lsd_client_size;
1164
1165         mutex_init(&ted->ted_lcd_lock);
1166
1167         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1168
1169         RETURN(0);
1170 }
1171
1172 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
1173 {
1174         struct tg_export_data   *ted = &exp->exp_target_data;
1175         struct lu_target        *tgt = class_exp2tgt(exp);
1176         int                      rc;
1177
1178         ENTRY;
1179
1180         LASSERT(ted->ted_lcd);
1181
1182         if (unlikely(tgt == NULL)) {
1183                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
1184                        class_exp2obd(exp)->obd_name);
1185                 RETURN(-EINVAL);
1186         }
1187
1188         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
1189         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
1190                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
1191             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT ||
1192             exp->exp_no_recovery)
1193                 RETURN(0);
1194
1195         /* Slot may be not yet assigned, use case is race between Client
1196          * reconnect and forced eviction */
1197         if (ted->ted_lr_idx < 0) {
1198                 CWARN("%s: client with UUID '%s' not in bitmap\n",
1199                       tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid);
1200                 RETURN(0);
1201         }
1202
1203         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
1204                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1205                ted->ted_lcd->lcd_uuid);
1206
1207         /* Clear the bit _after_ zeroing out the client so we don't
1208            race with filter_client_add and zero out new clients.*/
1209         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
1210                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
1211                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
1212                 LBUG();
1213         }
1214
1215         /* Do not erase record for recoverable client. */
1216         if (exp->exp_flags & OBD_OPT_FAILOVER)
1217                 RETURN(0);
1218
1219         if (CFS_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
1220                 RETURN(0);
1221
1222         /* Make sure the server's last_transno is up to date.
1223          * This should be done before zeroing client slot so last_transno will
1224          * be in server data or in client data in case of failure */
1225         rc = tgt_server_data_update(env, tgt, 0);
1226         if (rc != 0) {
1227                 CERROR("%s: failed to update server data, skip client %s "
1228                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
1229                        ted->ted_lcd->lcd_uuid, rc);
1230                 RETURN(rc);
1231         }
1232
1233         /* Race between an eviction and a disconnection ?*/
1234         mutex_lock(&ted->ted_lcd_lock);
1235         if (ted->ted_lcd->lcd_uuid[0] == '\0') {
1236                 mutex_unlock(&ted->ted_lcd_lock);
1237                 RETURN(rc);
1238         }
1239
1240         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
1241         mutex_unlock(&ted->ted_lcd_lock);
1242
1243         rc = tgt_client_data_update(env, exp);
1244
1245         if (!rc && tgt_is_multimodrpcs_record(tgt, ted->ted_lcd))
1246                 atomic_dec(&tgt->lut_num_clients);
1247
1248         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
1249                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
1250                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
1251                ted->ted_lr_idx, ted->ted_lr_off, rc);
1252         RETURN(rc);
1253 }
1254 EXPORT_SYMBOL(tgt_client_del);
1255
1256 static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
1257 {
1258         struct tg_export_data   *ted = &exp->exp_target_data;
1259         struct lu_target        *lut = class_exp2tgt(exp);
1260         struct tg_reply_data    *trd, *tmp;
1261
1262         if (tag == 0)
1263                 return;
1264
1265         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
1266                 if (trd->trd_tag != tag)
1267                         continue;
1268
1269                 LASSERT(ergo(tgt_is_increasing_xid_client(exp),
1270                              trd->trd_reply.lrd_xid <= xid));
1271
1272                 ted->ted_release_tag++;
1273                 tgt_release_reply_data(lut, ted, trd);
1274         }
1275 }
1276
1277 static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
1278                        struct tg_export_data *ted, struct tg_reply_data *trd,
1279                        struct ptlrpc_request *req,
1280                        struct thandle *th, bool update_lrd_file)
1281 {
1282         struct tgt_session_info *tsi = NULL;
1283         struct lsd_reply_data *lrd;
1284         int i = -1;
1285         int rc;
1286
1287         lrd = &trd->trd_reply;
1288         /* update export last transno */
1289         mutex_lock(&ted->ted_lcd_lock);
1290         if (lrd->lrd_transno > ted->ted_lcd->lcd_last_transno)
1291                 ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
1292         mutex_unlock(&ted->ted_lcd_lock);
1293
1294         if (!tgt) {
1295                 trd->trd_index = TRD_INDEX_MEMORY;
1296                 GOTO(add_reply_data, rc = 0);
1297         }
1298
1299         if (env) {
1300                 tsi = tgt_ses_info(env);
1301                 if (tsi->tsi_batch_trd) {
1302                         LASSERT(tsi->tsi_batch_env);
1303                         trd = tsi->tsi_batch_trd;
1304                         i = trd->trd_index;
1305                 }
1306         }
1307
1308         if (i == -1) {
1309                 /* find a empty slot */
1310                 i = tgt_find_free_reply_slot(tgt);
1311                 if (unlikely(i < 0)) {
1312                         CERROR("%s: couldn't find a slot for reply data: rc = %d\n",
1313                                tgt_name(tgt), i);
1314                         RETURN(i);
1315                 }
1316                 trd->trd_index = i;
1317         }
1318
1319         if (update_lrd_file) {
1320                 loff_t  off;
1321
1322                 /* write reply data to disk */
1323                 off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
1324                 rc = tgt_reply_data_write(env, tgt, lrd, off, th);
1325                 if (unlikely(rc != 0)) {
1326                         CERROR("%s: can't update %s file: rc = %d\n",
1327                                tgt_name(tgt), REPLY_DATA, rc);
1328                         GOTO(free_slot, rc);
1329                 }
1330         }
1331
1332 add_reply_data:
1333         /* add reply data to target export's reply list */
1334         mutex_lock(&ted->ted_lcd_lock);
1335         if (req != NULL) {
1336                 int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
1337                               MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
1338
1339                 if (req->rq_obsolete) {
1340                         CDEBUG(D_INFO,
1341                                "drop reply data update for obsolete req xid=%llu,"
1342                                "transno=%llu, tag=%hu\n", req->rq_xid,
1343                                lrd->lrd_transno, trd->trd_tag);
1344                         mutex_unlock(&ted->ted_lcd_lock);
1345                         GOTO(free_slot, rc = -EBADR);
1346                 }
1347
1348                 if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude) &&
1349                     !(tsi && tsi->tsi_batch_env &&
1350                       trd->trd_reply.lrd_batch_idx > 0))
1351                         tgt_clean_by_tag(req->rq_export, req->rq_xid,
1352                                          trd->trd_tag);
1353         }
1354
1355         /*
1356          * For the batched RPC, all sub requests use one common @trd for the
1357          * reply data.
1358          */
1359         if (list_empty(&trd->trd_list)) {
1360                 list_add(&trd->trd_list, &ted->ted_reply_list);
1361                 ted->ted_reply_cnt++;
1362                 if (ted->ted_reply_cnt > ted->ted_reply_max)
1363                         ted->ted_reply_max = ted->ted_reply_cnt;
1364         }
1365         mutex_unlock(&ted->ted_lcd_lock);
1366
1367         CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
1368                "tag %hu, client gen %u, slot idx %d\n",
1369                trd, lrd->lrd_xid, lrd->lrd_transno,
1370                trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
1371
1372         RETURN(0);
1373
1374 free_slot:
1375         if (tgt != NULL)
1376                 tgt_clear_reply_slot(tgt, trd->trd_index);
1377         return rc;
1378 }
1379
1380 int tgt_mk_reply_data(const struct lu_env *env,
1381                       struct lu_target *tgt,
1382                       struct tg_export_data *ted,
1383                       struct ptlrpc_request *req,
1384                       __u64 opdata,
1385                       struct thandle *th,
1386                       bool write_update,
1387                       __u64 transno)
1388 {
1389         struct tg_reply_data *trd = NULL;
1390         struct lsd_reply_data *lrd;
1391         __u64 *pre_versions = NULL;
1392         struct tgt_session_info *tsi = NULL;
1393         int rc;
1394
1395         if (env != NULL) {
1396                 tsi = tgt_ses_info(env);
1397                 if (tsi->tsi_batch_trd) {
1398                         LASSERT(tsi->tsi_batch_env);
1399                         trd = tsi->tsi_batch_trd;
1400                 }
1401         }
1402
1403         if (trd == NULL) {
1404                 OBD_ALLOC_PTR(trd);
1405                 if (unlikely(trd == NULL))
1406                         RETURN(-ENOMEM);
1407
1408                 INIT_LIST_HEAD(&trd->trd_list);
1409         }
1410
1411         /* fill reply data information */
1412         lrd = &trd->trd_reply;
1413         lrd->lrd_transno = transno;
1414         if (tsi && tsi->tsi_batch_env) {
1415                 if (tsi->tsi_batch_idx == 0) {
1416                         LASSERT(req != NULL);
1417                         tsi->tsi_batch_trd = trd;
1418                         trd->trd_index = -1;
1419                         lrd->lrd_xid = req->rq_xid;
1420                         trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1421                         lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1422                 }
1423                 lrd->lrd_batch_idx = tsi->tsi_batch_idx;
1424         } else if (req != NULL) {
1425                 lrd->lrd_xid = req->rq_xid;
1426                 trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1427                 lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1428                 if (write_update) {
1429                         pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1430                         lrd->lrd_result = th->th_result;
1431                 }
1432         } else {
1433                 LASSERT(env != NULL);
1434                 LASSERT(tsi->tsi_xid != 0);
1435
1436                 lrd->lrd_xid = tsi->tsi_xid;
1437                 lrd->lrd_result = tsi->tsi_result;
1438                 lrd->lrd_client_gen = tsi->tsi_client_gen;
1439         }
1440
1441         lrd->lrd_data = opdata;
1442         if (pre_versions) {
1443                 trd->trd_pre_versions[0] = pre_versions[0];
1444                 trd->trd_pre_versions[1] = pre_versions[1];
1445                 trd->trd_pre_versions[2] = pre_versions[2];
1446                 trd->trd_pre_versions[3] = pre_versions[3];
1447         }
1448
1449         if (tsi && tsi->tsi_open_obj)
1450                 trd->trd_object = *lu_object_fid(&tsi->tsi_open_obj->do_lu);
1451
1452         rc = tgt_add_reply_data(env, tgt, ted, trd, req,
1453                                 th, write_update);
1454         if (rc < 0) {
1455                 OBD_FREE_PTR(trd);
1456                 if (rc == -EBADR)
1457                         rc = 0;
1458         }
1459         return rc;
1460
1461 }
1462 EXPORT_SYMBOL(tgt_mk_reply_data);
1463
1464 /*
1465  * last_rcvd & last_committed update callbacks
1466  */
1467 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
1468                                 struct dt_object *obj, __u64 opdata,
1469                                 struct thandle *th, struct ptlrpc_request *req)
1470 {
1471         struct tgt_thread_info  *tti = tgt_th_info(env);
1472         struct tgt_session_info *tsi = tgt_ses_info(env);
1473         struct obd_export *exp = tsi->tsi_exp;
1474         struct tg_export_data *ted;
1475         __u64 *transno_p;
1476         bool nolcd = false;
1477         int rc = 0;
1478
1479         ENTRY;
1480
1481
1482         LASSERT(exp != NULL);
1483         ted = &exp->exp_target_data;
1484
1485         /* Some clients don't support recovery, and they don't have last_rcvd
1486          * client data:
1487          * 1. lightweight clients.
1488          * 2. local clients on MDS which doesn't enable "localrecov".
1489          * 3. OFD connect may cause transaction before export has last_rcvd
1490          *    slot.
1491          */
1492         if (ted->ted_lr_idx < 0)
1493                 nolcd = true;
1494
1495         if (req != NULL)
1496                 tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1497         else
1498                 /* From update replay, tti_transno should be set already */
1499                 LASSERT(tti->tti_transno != 0);
1500
1501         spin_lock(&tgt->lut_translock);
1502         if (th->th_result != 0) {
1503                 if (tti->tti_transno != 0) {
1504                         CERROR("%s: replay transno %llu failed: rc = %d\n",
1505                                tgt_name(tgt), tti->tti_transno, th->th_result);
1506                 }
1507         } else if (tti->tti_transno == 0) {
1508                 tti->tti_transno = ++tgt->lut_last_transno;
1509         } else {
1510                 /* should be replay */
1511                 if (tti->tti_transno > tgt->lut_last_transno)
1512                         tgt->lut_last_transno = tti->tti_transno;
1513         }
1514         spin_unlock(&tgt->lut_translock);
1515
1516         /** VBR: set new versions */
1517         if (th->th_result == 0 && obj != NULL) {
1518                 struct dt_object *dto = dt_object_locate(obj, th->th_dev);
1519                 dt_version_set(env, dto, tti->tti_transno, th);
1520         }
1521
1522         /* filling reply data */
1523         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1524                tti->tti_transno, tgt->lut_obd->obd_last_committed);
1525
1526         if (req != NULL) {
1527                 req->rq_transno = tti->tti_transno;
1528                 lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
1529         }
1530
1531         /* if can't add callback, do sync write */
1532         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
1533
1534         if (nolcd) {
1535                 /* store transno in the last_rcvd header */
1536                 spin_lock(&tgt->lut_translock);
1537                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
1538                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
1539                         spin_unlock(&tgt->lut_translock);
1540                         /* Although current connection doesn't have slot
1541                          * in the last_rcvd, we still want to maintain
1542                          * the in-memory lsd_client_data structure in order to
1543                          * properly handle reply reconstruction. */
1544                         rc = tgt_server_data_write(env, tgt, th);
1545                 } else {
1546                         spin_unlock(&tgt->lut_translock);
1547                 }
1548         } else if (ted->ted_lr_off == 0) {
1549                 CERROR("%s: client idx %d has offset %lld\n",
1550                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
1551                 RETURN(-EINVAL);
1552         }
1553
1554         /* Target that supports multiple reply data */
1555         if (tgt_is_multimodrpcs_client(exp)) {
1556                 return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
1557                                          !!(req != NULL), tti->tti_transno);
1558         }
1559
1560         /* Enough for update replay, let's return */
1561         if (req == NULL)
1562                 RETURN(rc);
1563
1564         mutex_lock(&ted->ted_lcd_lock);
1565         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1566         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
1567                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
1568                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
1569                 ted->ted_lcd->lcd_last_close_result = th->th_result;
1570         } else {
1571                 /* VBR: save versions in last_rcvd for reconstruct. */
1572                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1573
1574                 if (pre_versions) {
1575                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
1576                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
1577                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
1578                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
1579                 }
1580                 transno_p = &ted->ted_lcd->lcd_last_transno;
1581                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
1582                 ted->ted_lcd->lcd_last_result = th->th_result;
1583                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
1584                  * see struct ldlm_reply->lock_policy_res1; */
1585                 ted->ted_lcd->lcd_last_data = opdata;
1586         }
1587
1588         /* Update transno in slot only if non-zero number, i.e. no errors */
1589         if (likely(tti->tti_transno != 0)) {
1590                 /* Don't overwrite bigger transaction number with lower one.
1591                  * That is not sign of problem in all cases, but in any case
1592                  * this value should be monotonically increased only. */
1593                 if (*transno_p > tti->tti_transno) {
1594                         if (!tgt->lut_no_reconstruct) {
1595                                 CERROR("%s: trying to overwrite bigger transno:"
1596                                        "on-disk: %llu, new: %llu replay: "
1597                                        "%d. See LU-617.\n", tgt_name(tgt),
1598                                        *transno_p, tti->tti_transno,
1599                                        req_is_replay(req));
1600                                 if (req_is_replay(req)) {
1601                                         spin_lock(&req->rq_export->exp_lock);
1602                                         req->rq_export->exp_vbr_failed = 1;
1603                                         spin_unlock(&req->rq_export->exp_lock);
1604                                 }
1605                                 mutex_unlock(&ted->ted_lcd_lock);
1606                                 RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
1607                         }
1608                 } else {
1609                         *transno_p = tti->tti_transno;
1610                 }
1611         }
1612
1613         if (!nolcd) {
1614                 tti->tti_off = ted->ted_lr_off;
1615                 if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
1616                         rc = -EIO;
1617                 else
1618                         rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
1619                                                    &tti->tti_off, th);
1620                 if (rc < 0) {
1621                         mutex_unlock(&ted->ted_lcd_lock);
1622                         RETURN(rc);
1623                 }
1624         }
1625         mutex_unlock(&ted->ted_lcd_lock);
1626         RETURN(rc);
1627 }
1628
1629 /*
1630  * last_rcvd update for echo client simulation.
1631  * It updates last_rcvd client slot and version of object in
1632  * simple way but with all locks to simulate all drawbacks
1633  */
1634 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
1635                                      struct lu_target *tgt,
1636                                      struct dt_object *obj,
1637                                      struct thandle *th,
1638                                      struct obd_export *exp)
1639 {
1640         struct tgt_thread_info  *tti = tgt_th_info(env);
1641         struct tg_export_data   *ted = &exp->exp_target_data;
1642         int                      rc = 0;
1643
1644         ENTRY;
1645
1646         tti->tti_transno = 0;
1647
1648         spin_lock(&tgt->lut_translock);
1649         if (th->th_result == 0)
1650                 tti->tti_transno = ++tgt->lut_last_transno;
1651         spin_unlock(&tgt->lut_translock);
1652
1653         /** VBR: set new versions */
1654         if (th->th_result == 0 && obj != NULL)
1655                 dt_version_set(env, obj, tti->tti_transno, th);
1656
1657         /* if can't add callback, do sync write */
1658         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
1659                                                 tti->tti_transno);
1660
1661         LASSERT(ted->ted_lr_off > 0);
1662
1663         mutex_lock(&ted->ted_lcd_lock);
1664         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1665         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
1666         ted->ted_lcd->lcd_last_result = th->th_result;
1667
1668         tti->tti_off = ted->ted_lr_off;
1669         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
1670         mutex_unlock(&ted->ted_lcd_lock);
1671         RETURN(rc);
1672 }
1673
1674 static int tgt_clients_data_init(const struct lu_env *env,
1675                                  struct lu_target *tgt,
1676                                  unsigned long last_size)
1677 {
1678         struct obd_device       *obd = tgt->lut_obd;
1679         struct lr_server_data   *lsd = &tgt->lut_lsd;
1680         struct lsd_client_data  *lcd = NULL;
1681         struct tg_export_data   *ted;
1682         int                      cl_idx;
1683         int                      rc = 0;
1684         loff_t                   off = lsd->lsd_client_start;
1685         __u32                    generation = 0;
1686         struct cfs_hash         *hash = NULL;
1687
1688         ENTRY;
1689
1690         if (tgt->lut_bottom->dd_rdonly)
1691                 RETURN(0);
1692
1693         BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
1694                      sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
1695
1696         OBD_ALLOC_PTR(lcd);
1697         if (lcd == NULL)
1698                 RETURN(-ENOMEM);
1699
1700         hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
1701         if (hash == NULL)
1702                 GOTO(err_out, rc = -ENODEV);
1703
1704         for (cl_idx = 0; off < last_size; cl_idx++) {
1705                 struct obd_export       *exp;
1706                 __u64                    last_transno;
1707
1708                 /* Don't assume off is incremented properly by
1709                  * read_record(), in case sizeof(*lcd)
1710                  * isn't the same as fsd->lsd_client_size.  */
1711                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
1712                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
1713                 if (rc) {
1714                         CERROR("%s: error reading last_rcvd %s idx %d off "
1715                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
1716                                cl_idx, off, rc);
1717                         rc = 0;
1718                         break; /* read error shouldn't cause startup to fail */
1719                 }
1720
1721                 if (lcd->lcd_uuid[0] == '\0') {
1722                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
1723                                cl_idx);
1724                         continue;
1725                 }
1726
1727                 last_transno = lcd_last_transno(lcd);
1728
1729                 /* These exports are cleaned up by disconnect, so they
1730                  * need to be set up like real exports as connect does.
1731                  */
1732                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
1733                        " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
1734                        cl_idx, last_transno, lsd->lsd_last_transno,
1735                        lcd_last_xid(lcd), lcd->lcd_generation);
1736
1737                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
1738                 if (IS_ERR(exp)) {
1739                         if (PTR_ERR(exp) == -EALREADY) {
1740                                 /* export already exists, zero out this one */
1741                                 CERROR("%s: Duplicate export %s!\n",
1742                                        tgt_name(tgt), lcd->lcd_uuid);
1743                                 continue;
1744                         }
1745                         GOTO(err_out, rc = PTR_ERR(exp));
1746                 }
1747
1748                 ted = &exp->exp_target_data;
1749                 *ted->ted_lcd = *lcd;
1750
1751                 rc = tgt_client_add(env, exp, cl_idx);
1752                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
1753                 /* VBR: set export last committed version */
1754                 exp->exp_last_committed = last_transno;
1755                 spin_lock(&exp->exp_lock);
1756                 exp->exp_connecting = 0;
1757                 exp->exp_in_recovery = 0;
1758                 spin_unlock(&exp->exp_lock);
1759                 atomic_inc(&obd->obd_max_recoverable_clients);
1760
1761                 if (tgt_is_multimodrpcs_record(tgt, lcd)) {
1762                         atomic_inc(&tgt->lut_num_clients);
1763
1764                         /* compute the highest valid client generation */
1765                         generation = max(generation, lcd->lcd_generation);
1766                         /* fill client_generation <-> export hash table */
1767                         rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
1768                                                  &exp->exp_gen_hash);
1769                         if (rc != 0) {
1770                                 CERROR("%s: duplicate export for client "
1771                                        "generation %u\n",
1772                                        tgt_name(tgt), lcd->lcd_generation);
1773                                 class_export_put(exp);
1774                                 GOTO(err_out, rc);
1775                         }
1776                 }
1777
1778                 class_export_put(exp);
1779
1780                 rc = rev_import_init(exp);
1781                 if (rc != 0) {
1782                         class_unlink_export(exp);
1783                         GOTO(err_out, rc);
1784                 }
1785
1786                 /* Need to check last_rcvd even for duplicated exports. */
1787                 CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
1788                        cl_idx, last_transno);
1789
1790                 spin_lock(&tgt->lut_translock);
1791                 tgt->lut_last_transno = max(last_transno,
1792                                             tgt->lut_last_transno);
1793                 spin_unlock(&tgt->lut_translock);
1794         }
1795
1796         /* record highest valid client generation */
1797         atomic_set(&tgt->lut_client_generation, generation);
1798
1799 err_out:
1800         if (hash != NULL)
1801                 cfs_hash_putref(hash);
1802         OBD_FREE_PTR(lcd);
1803         RETURN(rc);
1804 }
1805
1806 struct server_compat_data {
1807         __u32 rocompat;
1808         __u32 incompat;
1809         __u32 rocinit;
1810         __u32 incinit;
1811 };
1812
1813 static struct server_compat_data tgt_scd[] = {
1814         [LDD_F_SV_TYPE_MDT] = {
1815                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
1816                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1817                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
1818                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
1819                             OBD_INCOMPAT_MULTI_RPCS,
1820                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
1821                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1822                            OBD_INCOMPAT_MULTI_OI,
1823         },
1824         [LDD_F_SV_TYPE_OST] = {
1825                 .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
1826                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
1827                             OBD_INCOMPAT_FID,
1828                 .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
1829                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
1830         }
1831 };
1832
1833 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
1834 {
1835         struct tgt_thread_info          *tti = tgt_th_info(env);
1836         struct lr_server_data           *lsd = &tgt->lut_lsd;
1837         unsigned long                    last_rcvd_size;
1838         __u32                            index;
1839         int                              rc, type;
1840
1841         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
1842         if (rc)
1843                 RETURN(rc);
1844
1845         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1846
1847         /* ensure padding in the struct is the correct size */
1848         BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
1849                      sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
1850
1851         rc = server_name2index(tgt_name(tgt), &index, NULL);
1852         if (rc < 0) {
1853                 CERROR("%s: Can not get index from name: rc = %d\n",
1854                        tgt_name(tgt), rc);
1855                 RETURN(rc);
1856         }
1857         /* server_name2index() returns type */
1858         type = rc;
1859         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1860                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1861                 RETURN(-EINVAL);
1862         }
1863
1864         /* last_rcvd on OST doesn't provide reconstruct support because there
1865          * may be up to 8 in-flight write requests per single slot in
1866          * last_rcvd client data
1867          */
1868         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1869
1870         if (last_rcvd_size == 0) {
1871                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1872
1873                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1874                        sizeof(lsd->lsd_uuid));
1875                 lsd->lsd_last_transno = 0;
1876                 lsd->lsd_mount_count = 0;
1877                 lsd->lsd_server_size = LR_SERVER_SIZE;
1878                 lsd->lsd_client_start = LR_CLIENT_START;
1879                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1880                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1881                 lsd->lsd_osd_index = index;
1882                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1883                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1884         } else {
1885                 rc = tgt_server_data_read(env, tgt);
1886                 if (rc) {
1887                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1888                                tgt_name(tgt), rc);
1889                         RETURN(rc);
1890                 }
1891                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1892                         if (tgt->lut_bottom->dd_rdonly) {
1893                                 /* Such difference may be caused by mounting
1894                                  * up snapshot with new fsname under rd_only
1895                                  * mode. But even if it was NOT, it will not
1896                                  * damage the system because of "rd_only". */
1897                                 memcpy(lsd->lsd_uuid,
1898                                        tgt->lut_obd->obd_uuid.uuid,
1899                                        sizeof(lsd->lsd_uuid));
1900                         } else {
1901                                 LCONSOLE_ERROR_MSG(0x157, "Trying to start "
1902                                                    "OBD %s using the wrong "
1903                                                    "disk %s. Were the /dev/ "
1904                                                    "assignments rearranged?\n",
1905                                                    tgt->lut_obd->obd_uuid.uuid,
1906                                                    lsd->lsd_uuid);
1907                                 RETURN(-EINVAL);
1908                         }
1909                 }
1910
1911                 if (lsd->lsd_osd_index != index) {
1912                         LCONSOLE_ERROR_MSG(0x157,
1913                                            "%s: index %d in last rcvd is different with the index %d in config log, It might be disk corruption!\n",
1914                                            tgt_name(tgt),
1915                                            lsd->lsd_osd_index, index);
1916                         RETURN(-EINVAL);
1917                 }
1918         }
1919
1920         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1921                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1922                        tgt_name(tgt),
1923                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1924                 RETURN(-EINVAL);
1925         }
1926
1927         if (type == LDD_F_SV_TYPE_MDT)
1928                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1929
1930         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1931                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1932                        tgt_name(tgt),
1933                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1934                 RETURN(-EINVAL);
1935         }
1936         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1937         if (type == LDD_F_SV_TYPE_MDT &&
1938             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1939                 if (last_rcvd_size > lsd->lsd_client_start) {
1940                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1941                                       "remove all clients for interop needs\n",
1942                                       tgt_name(tgt));
1943                         rc = tgt_truncate_object(env, tgt, tgt->lut_last_rcvd,
1944                                                  lsd->lsd_client_start);
1945                         if (rc)
1946                                 RETURN(rc);
1947                         last_rcvd_size = lsd->lsd_client_start;
1948                 }
1949                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1950                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1951         }
1952
1953         spin_lock(&tgt->lut_translock);
1954         tgt->lut_last_transno = lsd->lsd_last_transno;
1955         spin_unlock(&tgt->lut_translock);
1956
1957         lsd->lsd_mount_count++;
1958
1959         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1960         CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
1961                tgt_name(tgt), tgt->lut_last_transno);
1962         CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
1963                tgt_name(tgt), lsd->lsd_mount_count);
1964         CDEBUG(D_INODE, "%s: server data size: %u\n",
1965                tgt_name(tgt), lsd->lsd_server_size);
1966         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1967                tgt_name(tgt), lsd->lsd_client_start);
1968         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1969                tgt_name(tgt), lsd->lsd_client_size);
1970         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1971                tgt_name(tgt), last_rcvd_size);
1972         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1973                tgt_name(tgt), lsd->lsd_subdir_count);
1974         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1975                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1976                (last_rcvd_size - lsd->lsd_client_start) /
1977                 lsd->lsd_client_size);
1978         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1979
1980         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1981             lsd->lsd_client_size == 0) {
1982                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1983                 RETURN(-EINVAL);
1984         }
1985
1986         if (!tgt->lut_obd->obd_replayable)
1987                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1988
1989         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1990         if (rc < 0)
1991                 GOTO(err_client, rc);
1992
1993         spin_lock(&tgt->lut_translock);
1994         /* obd_last_committed is used for compatibility
1995          * with other lustre recovery code */
1996         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
1997         spin_unlock(&tgt->lut_translock);
1998
1999         obd2obt(tgt->lut_obd)->obt_mount_count = lsd->lsd_mount_count;
2000         obd2obt(tgt->lut_obd)->obt_instance = (__u32)lsd->lsd_mount_count;
2001
2002         /* save it, so mount count and last_transno is current */
2003         rc = tgt_server_data_update(env, tgt, 0);
2004         if (rc < 0)
2005                 GOTO(err_client, rc);
2006
2007         RETURN(0);
2008
2009 err_client:
2010         class_disconnect_exports(tgt->lut_obd);
2011         return rc;
2012 }
2013
2014 /* add credits for last_rcvd update */
2015 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
2016                      void *cookie)
2017 {
2018         struct lu_target        *tgt = cookie;
2019         struct tgt_session_info *tsi;
2020         struct tgt_thread_info  *tti = tgt_th_info(env);
2021         struct dt_object        *dto;
2022         int                      rc;
2023
2024         /* For readonly case, the caller should have got failure
2025          * when start the transaction. If the logic comes here,
2026          * there must be something wrong. */
2027         if (unlikely(tgt->lut_bottom->dd_rdonly)) {
2028                 dump_stack();
2029                 LBUG();
2030         }
2031
2032         /* if there is no session, then this transaction is not result of
2033          * request processing but some local operation */
2034         if (env->le_ses == NULL)
2035                 return 0;
2036
2037         LASSERT(tgt->lut_last_rcvd);
2038         tsi = tgt_ses_info(env);
2039         /* OFD may start transaction without export assigned */
2040         if (tsi->tsi_exp == NULL)
2041                 return 0;
2042
2043         if (tgt_is_multimodrpcs_client(tsi->tsi_exp)) {
2044                 /*
2045                  * Use maximum possible file offset for declaration to ensure
2046                  * ZFS will reserve enough credits for a write anywhere in this
2047                  * file, since we don't know where in the file the write will be
2048                  * because a replay slot has not been assigned.  This should be
2049                  * replaced by dmu_tx_hold_append() when available.
2050                  */
2051                 tti->tti_buf.lb_buf = NULL;
2052                 tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
2053                 dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
2054                 rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
2055                 if (rc)
2056                         return rc;
2057         } else {
2058                 dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
2059                 tti_buf_lcd(tti);
2060                 tti->tti_off = tsi->tsi_exp->exp_target_data.ted_lr_off;
2061                 rc = dt_declare_record_write(env, dto, &tti->tti_buf,
2062                                              tti->tti_off, th);
2063                 if (rc)
2064                         return rc;
2065         }
2066
2067         if (tsi->tsi_vbr_obj != NULL &&
2068             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2069                 dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
2070                 rc = dt_declare_version_set(env, dto, th);
2071         }
2072
2073         return rc;
2074 }
2075
2076 /* Update last_rcvd records with latests transaction data */
2077 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
2078                     void *cookie)
2079 {
2080         struct lu_target        *tgt = cookie;
2081         struct tgt_session_info *tsi;
2082         struct tgt_thread_info  *tti = tgt_th_info(env);
2083         struct dt_object        *obj = NULL;
2084         int                      rc;
2085         bool                     echo_client;
2086
2087         if (env->le_ses == NULL)
2088                 return 0;
2089
2090         tsi = tgt_ses_info(env);
2091         /* OFD may start transaction without export assigned */
2092         if (tsi->tsi_exp == NULL)
2093                 return 0;
2094
2095         echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
2096
2097         if (tsi->tsi_has_trans && !echo_client && !tsi->tsi_batch_env) {
2098                 if (!tsi->tsi_mult_trans) {
2099                         CDEBUG(D_HA, "More than one transaction %llu\n",
2100                                tti->tti_transno);
2101                         /**
2102                          * if RPC handler sees unexpected multiple last_rcvd
2103                          * updates with transno, then it is better to return
2104                          * the latest transaction number to the client.
2105                          * In that case replay may fail if part of operation
2106                          * was committed and can't be re-applied easily. But
2107                          * that is better than report the first transno, in
2108                          * which case partially committed operation would be
2109                          * considered as finished so never replayed causing
2110                          * data loss.
2111                          */
2112                 }
2113                 /* we need new transno to be assigned */
2114                 tti->tti_transno = 0;
2115         }
2116
2117         if (!th->th_result)
2118                 tsi->tsi_has_trans++;
2119
2120         if (tsi->tsi_vbr_obj != NULL &&
2121             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2122                 obj = tsi->tsi_vbr_obj;
2123         }
2124
2125         if (unlikely(echo_client)) /* echo client special case */
2126                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
2127                                                tsi->tsi_exp);
2128         else
2129                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
2130                                           tgt_ses_req(tsi));
2131         return rc;
2132 }
2133
2134 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
2135 {
2136         struct tgt_thread_info  *tti = tgt_th_info(env);
2137         struct lsd_reply_data   *lrd = &tti->tti_lrd;
2138         unsigned long            reply_data_size;
2139         int                      rc;
2140         struct lsd_reply_header *lrh = NULL;
2141         struct tg_reply_data    *trd = NULL;
2142         int                      idx;
2143         loff_t                   off;
2144         struct cfs_hash         *hash = NULL;
2145         struct obd_export       *exp;
2146         struct tg_export_data   *ted;
2147         int                      reply_data_recovered = 0;
2148
2149         rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr);
2150         if (rc)
2151                 GOTO(out, rc);
2152         reply_data_size = (unsigned long)tti->tti_attr.la_size;
2153
2154         OBD_ALLOC_PTR(lrh);
2155         if (lrh == NULL)
2156                 GOTO(out, rc = -ENOMEM);
2157
2158         if (reply_data_size == 0) {
2159                 CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
2160                        tgt_name(tgt));
2161                 lrh->lrh_magic = LRH_MAGIC;
2162                 lrh->lrh_header_size = sizeof(*lrh);
2163                 lrh->lrh_reply_size = sizeof(*lrd);
2164                 rc = tgt_reply_header_write(env, tgt, lrh);
2165                 if (rc) {
2166                         CERROR("%s: error writing %s: rc = %d\n",
2167                                tgt_name(tgt), REPLY_DATA, rc);
2168                         GOTO(out, rc);
2169                 }
2170         } else {
2171                 __u32 recsz = sizeof(*lrd);
2172                 const char *lrd_ver = "v2";
2173
2174                 rc = tgt_reply_header_read(env, tgt, lrh);
2175                 if (rc) {
2176                         CERROR("%s: error reading %s: rc = %d\n",
2177                                tgt_name(tgt), REPLY_DATA, rc);
2178                         GOTO(out, rc);
2179                 }
2180
2181                 switch (lrh->lrh_magic) {
2182 #if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 5, 53, 0)
2183                 /* The old reply_data is replaced on the first mount after
2184                  * an upgrade, so no need to keep this interop code forever.
2185                  */
2186                 case LRH_MAGIC_V1:
2187                         recsz = sizeof(struct lsd_reply_data_v1);
2188                         lrd_ver = "v1";
2189
2190                         CWARN("%s: %s v1 will be upgraded to new record size\n",
2191                               tgt_name(tgt), REPLY_DATA);
2192                         fallthrough;
2193 #endif
2194                 case LRH_MAGIC_V2:
2195                         if (lrh->lrh_header_size != sizeof(*lrh)) {
2196                                 CERROR("%s: bad %s %s header size: %u != %lu\n",
2197                                        tgt_name(tgt), REPLY_DATA, lrd_ver,
2198                                        lrh->lrh_header_size, sizeof(*lrh));
2199                                 GOTO(out, rc = -EINVAL);
2200                         }
2201                         if (lrh->lrh_reply_size != recsz) {
2202                                 CERROR("%s: bad %s %s reply size: %u != %u\n",
2203                                 tgt_name(tgt), REPLY_DATA, lrd_ver,
2204                                 lrh->lrh_reply_size, recsz);
2205                                 GOTO(out, rc = -EINVAL);
2206                         }
2207                         break;
2208                 default:
2209                         CERROR("%s: invalid %s magic: %x != %x/%x\n",
2210                                tgt_name(tgt), REPLY_DATA,
2211                                lrh->lrh_magic, LRH_MAGIC_V1, LRH_MAGIC_V2);
2212                         GOTO(out, rc = -EINVAL);
2213                 }
2214
2215                 hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
2216                 if (hash == NULL)
2217                         GOTO(out, rc = -ENODEV);
2218
2219                 OBD_ALLOC_PTR(trd);
2220                 if (trd == NULL)
2221                         GOTO(out, rc = -ENOMEM);
2222
2223                 /* Load reply_data from disk */
2224                 for (idx = 0, off = lrh->lrh_header_size;
2225                      off < reply_data_size; idx++, off += recsz) {
2226                         rc = tgt_reply_data_read(env, tgt, lrd, off,
2227                                                  lrh->lrh_magic);
2228                         if (rc) {
2229                                 CERROR("%s: error reading %s: rc = %d\n",
2230                                        tgt_name(tgt), REPLY_DATA, rc);
2231                                 GOTO(out, rc);
2232                         }
2233
2234                         exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
2235                         if (exp == NULL) {
2236                                 /* old reply data from a disconnected client */
2237                                 continue;
2238                         }
2239                         ted = &exp->exp_target_data;
2240                         mutex_lock(&ted->ted_lcd_lock);
2241
2242                         /* create in-memory reply_data and link it to
2243                          * target export's reply list */
2244                         rc = tgt_set_reply_slot(tgt, idx);
2245                         if (rc != 0) {
2246                                 mutex_unlock(&ted->ted_lcd_lock);
2247                                 GOTO(out, rc);
2248                         }
2249                         trd->trd_reply = *lrd;
2250                         trd->trd_pre_versions[0] = 0;
2251                         trd->trd_pre_versions[1] = 0;
2252                         trd->trd_pre_versions[2] = 0;
2253                         trd->trd_pre_versions[3] = 0;
2254                         trd->trd_index = idx;
2255                         trd->trd_tag = 0;
2256                         fid_zero(&trd->trd_object);
2257                         list_add(&trd->trd_list, &ted->ted_reply_list);
2258                         ted->ted_reply_cnt++;
2259                         if (ted->ted_reply_cnt > ted->ted_reply_max)
2260                                 ted->ted_reply_max = ted->ted_reply_cnt;
2261
2262                         CDEBUG(D_HA, "%s: restore reply %p: xid %llu, "
2263                                "transno %llu, client gen %u, slot idx %d\n",
2264                                tgt_name(tgt), trd, lrd->lrd_xid,
2265                                lrd->lrd_transno, lrd->lrd_client_gen,
2266                                trd->trd_index);
2267
2268                         /* update export last committed transation */
2269                         exp->exp_last_committed = max(exp->exp_last_committed,
2270                                                       lrd->lrd_transno);
2271                         /* Update lcd_last_transno as well for check in
2272                          * tgt_release_reply_data() or the latest client
2273                          * transno can be lost.
2274                          */
2275                         ted->ted_lcd->lcd_last_transno =
2276                                 max(ted->ted_lcd->lcd_last_transno,
2277                                     exp->exp_last_committed);
2278
2279                         mutex_unlock(&ted->ted_lcd_lock);
2280                         class_export_put(exp);
2281
2282                         /* update target last committed transaction */
2283                         spin_lock(&tgt->lut_translock);
2284                         tgt->lut_last_transno = max(tgt->lut_last_transno,
2285                                                     lrd->lrd_transno);
2286                         spin_unlock(&tgt->lut_translock);
2287
2288                         reply_data_recovered++;
2289
2290                         OBD_ALLOC_PTR(trd);
2291                         if (trd == NULL)
2292                                 GOTO(out, rc = -ENOMEM);
2293                 }
2294                 CDEBUG(D_INFO, "%s: %d reply data have been recovered\n",
2295                        tgt_name(tgt), reply_data_recovered);
2296         }
2297
2298         spin_lock(&tgt->lut_translock);
2299         /* obd_last_committed is used for compatibility
2300          * with other lustre recovery code */
2301         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
2302         spin_unlock(&tgt->lut_translock);
2303
2304         rc = 0;
2305
2306 out:
2307         if (hash != NULL)
2308                 cfs_hash_putref(hash);
2309         if (trd != NULL)
2310                 OBD_FREE_PTR(trd);
2311         if (lrh != NULL)
2312                 OBD_FREE_PTR(lrh);
2313         return rc;
2314 }
2315
2316 static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
2317                                 struct tg_reply_data *trd)
2318 {
2319         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2320         struct lu_target *lut = class_exp2tgt(req->rq_export);
2321         __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
2322         int rc = 0;
2323         struct tg_reply_data *reply;
2324         bool check_increasing;
2325
2326         if (tag == 0)
2327                 return 0;
2328
2329         check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
2330                            !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2331         if (!lookup && !check_increasing)
2332                 return 0;
2333
2334         list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
2335                 if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
2336                         rc = 1;
2337                         if (trd != NULL)
2338                                 *trd = *reply;
2339                         break;
2340                 } else if (check_increasing && reply->trd_tag == tag &&
2341                            reply->trd_reply.lrd_xid > req->rq_xid) {
2342                         rc = -EPROTO;
2343                         CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
2344                                tgt_name(lut), tag, req->rq_xid, trd,
2345                                reply->trd_reply.lrd_xid,
2346                                reply->trd_reply.lrd_transno,
2347                                reply->trd_reply.lrd_client_gen,
2348                                reply->trd_index, rc);
2349                         break;
2350                 }
2351         }
2352
2353         return rc;
2354 }
2355
2356 /* Look for a reply data matching specified request @req
2357  * A copy is returned in @trd if the pointer is not NULL
2358  */
2359 int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
2360 {
2361         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2362         int found = 0;
2363         bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2364
2365         mutex_lock(&ted->ted_lcd_lock);
2366         if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
2367                 /* A check for the last_xid is needed here in case there is
2368                  * no reply data is left in the list. It may happen if another
2369                  * RPC on another slot increased the last_xid between our
2370                  * process_req_last_xid & tgt_lookup_reply calls */
2371                 found = -EPROTO;
2372         } else {
2373                 found = tgt_check_lookup_req(req, 1, trd);
2374         }
2375         mutex_unlock(&ted->ted_lcd_lock);
2376
2377         CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
2378                tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
2379                req->rq_export->exp_last_xid);
2380
2381         return found;
2382 }
2383 EXPORT_SYMBOL(tgt_lookup_reply);
2384
2385 int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
2386 {
2387         struct tg_export_data   *ted = &exp->exp_target_data;
2388         struct lu_target        *lut = class_exp2tgt(exp);
2389         struct tg_reply_data    *trd, *tmp;
2390
2391
2392         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
2393                 if (trd->trd_reply.lrd_xid > rcvd_xid)
2394                         continue;
2395                 ted->ted_release_xid++;
2396                 tgt_release_reply_data(lut, ted, trd);
2397         }
2398
2399         return 0;
2400 }
2401
2402 int tgt_handle_tag(struct ptlrpc_request *req)
2403 {
2404         return tgt_check_lookup_req(req, 0, NULL);
2405 }
2406