Whamcloud - gitweb
LU-14393 recovery: reply reconstruction for batched RPCs
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Lustre Unified Target
32  * These are common function to work with last_received file
33  *
34  * Author: Mikhail Pershin <mike.pershin@intel.com>
35  */
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_fid.h>
39
40 #include "tgt_internal.h"
41
42 /** version recovery epoch */
43 #define LR_EPOCH_BITS   32
44
45 /* Allocate a bitmap for a chunk of reply data slots */
46 static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
47 {
48         unsigned long *bm;
49
50         OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
51                         sizeof(long));
52         if (bm == NULL)
53                 return -ENOMEM;
54
55         spin_lock(&lut->lut_client_bitmap_lock);
56
57         if (lut->lut_reply_bitmap[chunk] != NULL) {
58                 /* someone else already allocated the bitmap for this chunk */
59                 spin_unlock(&lut->lut_client_bitmap_lock);
60                 OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
61                          sizeof(long));
62                 return 0;
63         }
64
65         lut->lut_reply_bitmap[chunk] = bm;
66
67         spin_unlock(&lut->lut_client_bitmap_lock);
68
69         return 0;
70 }
71
72 /* Look for an available reply data slot in the bitmap
73  * of the target @lut
74  * Allocate bitmap chunk when first used
75  * XXX algo could be improved if this routine limits performance
76  */
77 static int tgt_find_free_reply_slot(struct lu_target *lut)
78 {
79         unsigned long *bmp;
80         int chunk = 0;
81         int rc;
82         int b;
83
84         for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
85                 /* allocate the bitmap chunk if necessary */
86                 if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
87                         rc = tgt_bitmap_chunk_alloc(lut, chunk);
88                         if (rc != 0)
89                                 return rc;
90                 }
91                 bmp = lut->lut_reply_bitmap[chunk];
92
93                 /* look for an available slot in this chunk */
94                 do {
95                         b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
96                         if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
97                                 break;
98
99                         /* found one */
100                         if (test_and_set_bit(b, bmp) == 0)
101                                 return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
102                 } while (true);
103         }
104
105         return -ENOSPC;
106 }
107
108 /* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
109  * of the target @lut
110  * Allocate the bitmap chunk if necessary
111  */
112 static int tgt_set_reply_slot(struct lu_target *lut, int idx)
113 {
114         int chunk;
115         int b;
116         int rc;
117
118         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
119         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
120
121         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
122         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
123
124         /* allocate the bitmap chunk if necessary */
125         if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
126                 rc = tgt_bitmap_chunk_alloc(lut, chunk);
127                 if (rc != 0)
128                         return rc;
129         }
130
131         /* mark the slot 'used' in this chunk */
132         if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
133                 CERROR("%s: slot %d already set in bitmap\n",
134                        tgt_name(lut), idx);
135                 return -EALREADY;
136         }
137
138         return 0;
139 }
140
141
142 /* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
143  * of the target @lut
144  */
145 static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
146 {
147         int chunk;
148         int b;
149
150         if (lut->lut_obd->obd_stopping)
151                 /*
152                  * in case of failover keep the bit set in order to
153                  * avoid overwriting slots in reply_data which might
154                  * be required by resent rpcs
155                  */
156                 return 0;
157         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
158         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
159
160         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
161         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
162
163         if (lut->lut_reply_bitmap[chunk] == NULL) {
164                 CERROR("%s: slot %d not allocated\n",
165                        tgt_name(lut), idx);
166                 return -ENOENT;
167         }
168
169         if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
170                 CERROR("%s: slot %d already clear in bitmap\n",
171                        tgt_name(lut), idx);
172                 return -EALREADY;
173         }
174
175         return 0;
176 }
177
178
179 /* Read header of reply_data file of target @tgt into structure @lrh */
180 static int tgt_reply_header_read(const struct lu_env *env,
181                                  struct lu_target *tgt,
182                                  struct lsd_reply_header *lrh)
183 {
184         int                      rc;
185         struct lsd_reply_header  buf;
186         struct tgt_thread_info  *tti = tgt_th_info(env);
187
188         tti->tti_off = 0;
189         tti->tti_buf.lb_buf = &buf;
190         tti->tti_buf.lb_len = sizeof(buf);
191
192         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
193                             &tti->tti_off);
194         if (rc != 0)
195                 return rc;
196
197         lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
198         lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
199         lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
200
201         CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
202                "header_size=%d reply_size=%d\n",
203                 tgt->lut_obd->obd_name, REPLY_DATA,
204                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
205
206         return 0;
207 }
208
209 /* Write header into replay_data file of target @tgt from structure @lrh */
210 static int tgt_reply_header_write(const struct lu_env *env,
211                                   struct lu_target *tgt,
212                                   struct lsd_reply_header *lrh)
213 {
214         int                      rc;
215         struct lsd_reply_header  buf;
216         struct tgt_thread_info  *tti = tgt_th_info(env);
217         struct thandle          *th;
218         struct dt_object        *dto;
219
220         CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
221                "header_size=%d reply_size=%d\n",
222                 tgt->lut_obd->obd_name, REPLY_DATA,
223                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
224
225         if (tgt->lut_bottom->dd_rdonly)
226                 RETURN(0);
227
228         buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
229         buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
230         buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
231
232         th = dt_trans_create(env, tgt->lut_bottom);
233         if (IS_ERR(th))
234                 return PTR_ERR(th);
235         th->th_sync = 1;
236
237         tti->tti_off = 0;
238         tti->tti_buf.lb_buf = &buf;
239         tti->tti_buf.lb_len = sizeof(buf);
240
241         rc = dt_declare_record_write(env, tgt->lut_reply_data,
242                                      &tti->tti_buf, tti->tti_off, th);
243         if (rc)
244                 GOTO(out, rc);
245
246         rc = dt_trans_start(env, tgt->lut_bottom, th);
247         if (rc)
248                 GOTO(out, rc);
249
250         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
251         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
252 out:
253         dt_trans_stop(env, tgt->lut_bottom, th);
254         return rc;
255 }
256
257 /* Write the reply data @lrd into reply_data file of target @tgt
258  * at offset @off
259  */
260 static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
261                                 struct lsd_reply_data *lrd, loff_t off,
262                                 struct thandle *th)
263 {
264         struct tgt_thread_info  *tti = tgt_th_info(env);
265         struct dt_object        *dto;
266         struct lsd_reply_data   *buf = &tti->tti_lrd;
267
268         lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
269
270         buf->lrd_transno         = cpu_to_le64(lrd->lrd_transno);
271         buf->lrd_xid             = cpu_to_le64(lrd->lrd_xid);
272         buf->lrd_data            = cpu_to_le64(lrd->lrd_data);
273         buf->lrd_result          = cpu_to_le32(lrd->lrd_result);
274         buf->lrd_client_gen      = cpu_to_le32(lrd->lrd_client_gen);
275
276         lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
277
278         tti->tti_off = off;
279         tti->tti_buf.lb_buf = buf;
280         tti->tti_buf.lb_len = sizeof(*buf);
281
282         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
283         return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
284 }
285
286 /* Read the reply data from reply_data file of target @tgt at offset @off
287  * into structure @lrd
288  */
289 static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
290                                struct lsd_reply_data *lrd, loff_t off)
291 {
292         int                      rc;
293         struct tgt_thread_info  *tti = tgt_th_info(env);
294         struct lsd_reply_data   *buf = &tti->tti_lrd;
295
296         tti->tti_off = off;
297         tti->tti_buf.lb_buf = buf;
298         tti->tti_buf.lb_len = sizeof(*buf);
299
300         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
301                             &tti->tti_off);
302         if (rc != 0)
303                 return rc;
304
305         lrd->lrd_transno         = le64_to_cpu(buf->lrd_transno);
306         lrd->lrd_xid             = le64_to_cpu(buf->lrd_xid);
307         lrd->lrd_data            = le64_to_cpu(buf->lrd_data);
308         lrd->lrd_result          = le32_to_cpu(buf->lrd_result);
309         lrd->lrd_client_gen      = le32_to_cpu(buf->lrd_client_gen);
310         lrd->lrd_batch_idx       = le32_to_cpu(buf->lrd_batch_idx);
311         return 0;
312 }
313
314
315 /* Free the in-memory reply data structure @trd and release
316  * the corresponding slot in the reply_data file of target @lut
317  * Called with ted_lcd_lock held
318  */
319 static void tgt_free_reply_data(struct lu_target *lut,
320                                 struct tg_export_data *ted,
321                                 struct tg_reply_data *trd)
322 {
323         CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
324                "client gen %u, slot idx %d\n",
325                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
326                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
327                trd->trd_index);
328
329         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
330
331         list_del(&trd->trd_list);
332         ted->ted_reply_cnt--;
333         if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
334                 tgt_clear_reply_slot(lut, trd->trd_index);
335         OBD_FREE_PTR(trd);
336 }
337
338 /* Release the reply data @trd from target @lut
339  * The reply data with the highest transno for this export
340  * is retained to ensure correctness of target recovery
341  * Called with ted_lcd_lock held
342  */
343 static void tgt_release_reply_data(struct lu_target *lut,
344                                    struct tg_export_data *ted,
345                                    struct tg_reply_data *trd)
346 {
347         CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
348                "client gen %u, slot idx %d\n",
349                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
350                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
351                trd->trd_index);
352
353         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
354
355         /* Do not free the reply data corresponding to the
356          * highest transno of this export.
357          * This ensures on-disk reply data is kept and
358          * last committed transno can be restored from disk in case
359          * of target recovery
360          */
361         if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
362                 /* free previous retained reply */
363                 if (ted->ted_reply_last != NULL)
364                         tgt_free_reply_data(lut, ted, ted->ted_reply_last);
365                 /* retain the reply */
366                 list_del_init(&trd->trd_list);
367                 ted->ted_reply_last = trd;
368         } else {
369                 tgt_free_reply_data(lut, ted, trd);
370         }
371 }
372
373 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
374 {
375         tti->tti_buf.lb_buf = &tti->tti_lsd;
376         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
377         return &tti->tti_buf;
378 }
379
380 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
381 {
382         tti->tti_buf.lb_buf = &tti->tti_lcd;
383         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
384         return &tti->tti_buf;
385 }
386
387 static inline bool tgt_is_multimodrpcs_record(struct lu_target *tgt,
388                                               struct lsd_client_data *lcd)
389 {
390         return tgt->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
391                 lcd->lcd_generation != 0;
392 }
393
394 /**
395  * Allocate in-memory data for client slot related to export.
396  */
397 int tgt_client_alloc(struct obd_export *exp)
398 {
399         ENTRY;
400         LASSERT(exp != exp->exp_obd->obd_self_export);
401
402         spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
403         INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
404         spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
405         INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
406
407         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
408         if (exp->exp_target_data.ted_lcd == NULL)
409                 RETURN(-ENOMEM);
410         /* Mark that slot is not yet valid, 0 doesn't work here */
411         exp->exp_target_data.ted_lr_idx = -1;
412         INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list);
413         mutex_init(&exp->exp_target_data.ted_lcd_lock);
414         RETURN(0);
415 }
416 EXPORT_SYMBOL(tgt_client_alloc);
417
418 /**
419  * Free in-memory data for client slot related to export.
420  */
421 void tgt_client_free(struct obd_export *exp)
422 {
423         struct tg_export_data   *ted = &exp->exp_target_data;
424         struct lu_target        *lut = class_exp2tgt(exp);
425         struct tg_reply_data    *trd, *tmp;
426
427         LASSERT(exp != exp->exp_obd->obd_self_export);
428
429         tgt_fmd_cleanup(exp);
430
431         /* free reply data */
432         mutex_lock(&ted->ted_lcd_lock);
433         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
434                 tgt_release_reply_data(lut, ted, trd);
435         }
436         if (ted->ted_reply_last != NULL) {
437                 tgt_free_reply_data(lut, ted, ted->ted_reply_last);
438                 ted->ted_reply_last = NULL;
439         }
440         mutex_unlock(&ted->ted_lcd_lock);
441
442         if (!hlist_unhashed(&exp->exp_gen_hash))
443                 cfs_hash_del(exp->exp_obd->obd_gen_hash,
444                              &ted->ted_lcd->lcd_generation,
445                              &exp->exp_gen_hash);
446
447         OBD_FREE_PTR(ted->ted_lcd);
448         ted->ted_lcd = NULL;
449
450         /* Target may have been freed (see LU-7430)
451          * Slot may be not yet assigned */
452         if (((struct obd_device_target *)(&exp->exp_obd->u))->obt_magic !=
453             OBT_MAGIC ||
454             ted->ted_lr_idx < 0)
455                 return;
456
457         /* Clear bit when lcd is freed */
458         LASSERT(lut && lut->lut_client_bitmap);
459         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
460                 CERROR("%s: client %u bit already clear in bitmap\n",
461                        exp->exp_obd->obd_name, ted->ted_lr_idx);
462                 LBUG();
463         }
464 }
465 EXPORT_SYMBOL(tgt_client_free);
466
467 static inline void tgt_check_lcd(const char *obd_name, int index,
468                                  struct lsd_client_data *lcd)
469 {
470         size_t uuid_size = sizeof(lcd->lcd_uuid);
471
472         if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
473                 lcd->lcd_uuid[uuid_size - 1] = '\0';
474
475                 LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
476                                lcd->lcd_uuid, obd_name, index);
477         }
478 }
479
480 static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
481                                 struct lsd_client_data *lcd,
482                                 loff_t *off, int index)
483 {
484         struct tgt_thread_info  *tti = tgt_th_info(env);
485         int                      rc;
486
487         tti_buf_lcd(tti);
488         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
489         if (rc == 0) {
490                 tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
491                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
492                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
493                 lcd->lcd_last_close_result =
494                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
495         }
496
497         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = %llu"
498                ", last_xid = %llu, last_result = %u, last_data = %u, "
499                "last_close_transno = %llu, last_close_xid = %llu, "
500                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
501                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
502                lcd->lcd_last_result, lcd->lcd_last_data,
503                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
504                lcd->lcd_last_close_result, rc);
505         return rc;
506 }
507
508 static int tgt_client_data_write(const struct lu_env *env,
509                                  struct lu_target *tgt,
510                                  struct lsd_client_data *lcd,
511                                  loff_t *off, struct thandle *th)
512 {
513         struct tgt_thread_info *tti = tgt_th_info(env);
514         struct dt_object        *dto;
515
516         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
517         lcd->lcd_last_close_result =
518                 ptlrpc_status_hton(lcd->lcd_last_close_result);
519         lcd_cpu_to_le(lcd, &tti->tti_lcd);
520         tti_buf_lcd(tti);
521
522         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
523         return dt_record_write(env, dto, &tti->tti_buf, off, th);
524 }
525
526 struct tgt_new_client_callback {
527         struct dt_txn_commit_cb  lncc_cb;
528         struct obd_export       *lncc_exp;
529 };
530
531 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
532                               struct dt_txn_commit_cb *cb, int err)
533 {
534         struct tgt_new_client_callback *ccb;
535
536         ccb = container_of(cb, struct tgt_new_client_callback, lncc_cb);
537
538         LASSERT(ccb->lncc_exp->exp_obd);
539
540         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
541                ccb->lncc_exp->exp_obd->obd_name,
542                ccb->lncc_exp->exp_client_uuid.uuid);
543
544         spin_lock(&ccb->lncc_exp->exp_lock);
545
546         ccb->lncc_exp->exp_need_sync = 0;
547
548         spin_unlock(&ccb->lncc_exp->exp_lock);
549         class_export_cb_put(ccb->lncc_exp);
550
551         OBD_FREE_PTR(ccb);
552 }
553
554 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
555 {
556         struct tgt_new_client_callback  *ccb;
557         struct dt_txn_commit_cb         *dcb;
558         int                              rc;
559
560         OBD_ALLOC_PTR(ccb);
561         if (ccb == NULL)
562                 return -ENOMEM;
563
564         ccb->lncc_exp = class_export_cb_get(exp);
565
566         dcb = &ccb->lncc_cb;
567         dcb->dcb_func = tgt_cb_new_client;
568         INIT_LIST_HEAD(&dcb->dcb_linkage);
569         strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
570
571         rc = dt_trans_cb_add(th, dcb);
572         if (rc) {
573                 class_export_cb_put(exp);
574                 OBD_FREE_PTR(ccb);
575         }
576         return rc;
577 }
578
579 /**
580  * Update client data in last_rcvd
581  */
582 static int tgt_client_data_update(const struct lu_env *env,
583                                   struct obd_export *exp)
584 {
585         struct tg_export_data   *ted = &exp->exp_target_data;
586         struct lu_target        *tgt = class_exp2tgt(exp);
587         struct tgt_thread_info  *tti = tgt_th_info(env);
588         struct thandle          *th;
589         int                      rc = 0;
590
591         ENTRY;
592
593         if (unlikely(tgt == NULL)) {
594                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
595                           class_exp2obd(exp)->obd_name);
596                 RETURN(-EINVAL);
597         }
598
599         if (tgt->lut_bottom->dd_rdonly)
600                 RETURN(0);
601
602         th = dt_trans_create(env, tgt->lut_bottom);
603         if (IS_ERR(th))
604                 RETURN(PTR_ERR(th));
605
606         tti_buf_lcd(tti);
607         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
608                                      &tti->tti_buf,
609                                      ted->ted_lr_off, th);
610         if (rc)
611                 GOTO(out, rc);
612
613         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
614         if (rc)
615                 GOTO(out, rc);
616
617         mutex_lock(&ted->ted_lcd_lock);
618
619         /*
620          * Until this operations will be committed the sync is needed
621          * for this export. This should be done _after_ starting the
622          * transaction so that many connecting clients will not bring
623          * server down with lots of sync writes.
624          */
625         rc = tgt_new_client_cb_add(th, exp);
626         if (rc) {
627                 /* can't add callback, do sync now */
628                 th->th_sync = 1;
629         } else {
630                 spin_lock(&exp->exp_lock);
631                 exp->exp_need_sync = 1;
632                 spin_unlock(&exp->exp_lock);
633         }
634
635         tti->tti_off = ted->ted_lr_off;
636         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
637
638         mutex_unlock(&ted->ted_lcd_lock);
639
640         EXIT;
641 out:
642         dt_trans_stop(env, tgt->lut_bottom, th);
643         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
644                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
645                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
646
647         return rc;
648 }
649
650 static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
651 {
652         struct tgt_thread_info  *tti = tgt_th_info(env);
653         int                      rc;
654
655         tti->tti_off = 0;
656         tti_buf_lsd(tti);
657         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
658                             &tti->tti_off);
659         if (rc == 0)
660                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
661
662         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
663                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
664                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
665         return rc;
666 }
667
668 static int tgt_server_data_write(const struct lu_env *env,
669                                  struct lu_target *tgt, struct thandle *th)
670 {
671         struct tgt_thread_info  *tti = tgt_th_info(env);
672         struct dt_object        *dto;
673         int                      rc;
674
675         ENTRY;
676
677         tti->tti_off = 0;
678         tti_buf_lsd(tti);
679         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
680
681         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
682         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
683
684         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
685                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
686                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
687
688         RETURN(rc);
689 }
690
691 /**
692  * Update server data in last_rcvd
693  */
694 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
695                            int sync)
696 {
697         struct tgt_thread_info  *tti = tgt_th_info(env);
698         struct thandle          *th;
699         int                      rc = 0;
700
701         ENTRY;
702
703         CDEBUG(D_SUPER,
704                "%s: mount_count is %llu, last_transno is %llu\n",
705                tgt->lut_lsd.lsd_uuid, obd2obt(tgt->lut_obd)->obt_mount_count,
706                tgt->lut_last_transno);
707
708         /* Always save latest transno to keep it fresh */
709         spin_lock(&tgt->lut_translock);
710         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
711         spin_unlock(&tgt->lut_translock);
712
713         if (tgt->lut_bottom->dd_rdonly)
714                 RETURN(0);
715
716         th = dt_trans_create(env, tgt->lut_bottom);
717         if (IS_ERR(th))
718                 RETURN(PTR_ERR(th));
719
720         th->th_sync = sync;
721
722         tti_buf_lsd(tti);
723         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
724                                      &tti->tti_buf, tti->tti_off, th);
725         if (rc)
726                 GOTO(out, rc);
727
728         rc = dt_trans_start(env, tgt->lut_bottom, th);
729         if (rc)
730                 GOTO(out, rc);
731
732         rc = tgt_server_data_write(env, tgt, th);
733 out:
734         dt_trans_stop(env, tgt->lut_bottom, th);
735
736         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
737                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
738                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
739         RETURN(rc);
740 }
741 EXPORT_SYMBOL(tgt_server_data_update);
742
743 static int tgt_truncate_last_rcvd(const struct lu_env *env,
744                                   struct lu_target *tgt, loff_t size)
745 {
746         struct dt_object *dt = tgt->lut_last_rcvd;
747         struct thandle   *th;
748         struct lu_attr    attr;
749         int               rc;
750
751         ENTRY;
752
753         if (tgt->lut_bottom->dd_rdonly)
754                 RETURN(0);
755
756         attr.la_size = size;
757         attr.la_valid = LA_SIZE;
758
759         th = dt_trans_create(env, tgt->lut_bottom);
760         if (IS_ERR(th))
761                 RETURN(PTR_ERR(th));
762         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
763         if (rc)
764                 GOTO(cleanup, rc);
765         rc = dt_declare_attr_set(env, dt, &attr, th);
766         if (rc)
767                 GOTO(cleanup, rc);
768         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
769         if (rc)
770                 GOTO(cleanup, rc);
771
772         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
773         if (rc == 0)
774                 rc = dt_attr_set(env, dt, &attr, th);
775
776 cleanup:
777         dt_trans_stop(env, tgt->lut_bottom, th);
778
779         RETURN(rc);
780 }
781
782 static void tgt_client_epoch_update(const struct lu_env *env,
783                                     struct obd_export *exp)
784 {
785         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
786         struct lu_target        *tgt = class_exp2tgt(exp);
787
788         LASSERT(tgt && tgt->lut_bottom);
789         /** VBR: set client last_epoch to current epoch */
790         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
791                 return;
792         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
793         tgt_client_data_update(env, exp);
794 }
795
796 /**
797  * Update boot epoch when recovery ends
798  */
799 void tgt_boot_epoch_update(struct lu_target *tgt)
800 {
801         struct lu_env            env;
802         struct ptlrpc_request   *req;
803         __u32                    start_epoch;
804         LIST_HEAD(client_list);
805         int                      rc;
806
807         if (tgt->lut_obd->obd_stopping)
808                 return;
809
810         rc = lu_env_init(&env, LCT_DT_THREAD);
811         if (rc) {
812                 CERROR("%s: can't initialize environment: rc = %d\n",
813                         tgt->lut_obd->obd_name, rc);
814                 return;
815         }
816
817         spin_lock(&tgt->lut_translock);
818         start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
819         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
820         tgt->lut_lsd.lsd_start_epoch = start_epoch;
821         spin_unlock(&tgt->lut_translock);
822
823         /**
824          * The recovery is not yet finished and final queue can still be updated
825          * with resend requests. Move final list to separate one for processing
826          */
827         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
828         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
829         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
830
831         /**
832          * go through list of exports participated in recovery and
833          * set new epoch for them
834          */
835         list_for_each_entry(req, &client_list, rq_list) {
836                 LASSERT(!req->rq_export->exp_delayed);
837                 if (!req->rq_export->exp_vbr_failed)
838                         tgt_client_epoch_update(&env, req->rq_export);
839         }
840         /** return list back at once */
841         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
842         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
843         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
844
845         /**
846          * Clear MULTI RPCS incompatibility flag if there is no multi-rpcs
847          * client in last_rcvd file
848          */
849         if (atomic_read(&tgt->lut_num_clients) == 0)
850                 tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
851
852         /** update server epoch */
853         tgt_server_data_update(&env, tgt, 1);
854         lu_env_fini(&env);
855 }
856
857 /**
858  * commit callback, need to update last_committed value
859  */
860 struct tgt_last_committed_callback {
861         struct dt_txn_commit_cb  llcc_cb;
862         struct lu_target        *llcc_tgt;
863         struct obd_export       *llcc_exp;
864         __u64                    llcc_transno;
865 };
866
867 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
868                                   struct dt_txn_commit_cb *cb, int err)
869 {
870         struct tgt_last_committed_callback *ccb;
871
872         ccb = container_of(cb, struct tgt_last_committed_callback, llcc_cb);
873
874         LASSERT(ccb->llcc_exp);
875         LASSERT(ccb->llcc_tgt != NULL);
876         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
877
878         if (th->th_reserved_quota.lqi_space > 0) {
879                 CDEBUG(D_QUOTA, "free quota %llu %llu\n",
880                        th->th_reserved_quota.lqi_id.qid_gid,
881                        th->th_reserved_quota.lqi_space);
882
883                 /* env can be NULL for freeing reserved quota */
884                 th->th_reserved_quota.lqi_space *= -1;
885                 dt_reserve_or_free_quota(NULL, th->th_dev,
886                                          &th->th_reserved_quota);
887         }
888
889         /* error hit, don't update last committed to provide chance to
890          * replay data after fail */
891         if (err != 0)
892                 goto out;
893
894         /* Fast path w/o spinlock, if exp_last_committed was updated
895          * with higher transno, no need to take spinlock and check,
896          * also no need to update obd_last_committed. */
897         if (ccb->llcc_transno <= ccb->llcc_exp->exp_last_committed)
898                 goto out;
899         spin_lock(&ccb->llcc_tgt->lut_translock);
900         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
901                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
902
903         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
904                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
905                 spin_unlock(&ccb->llcc_tgt->lut_translock);
906
907                 ptlrpc_commit_replies(ccb->llcc_exp);
908                 tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
909         } else {
910                 spin_unlock(&ccb->llcc_tgt->lut_translock);
911         }
912
913         CDEBUG(D_HA, "%s: transno %lld is committed\n",
914                ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
915
916 out:
917         class_export_cb_put(ccb->llcc_exp);
918         OBD_FREE_PTR(ccb);
919 }
920
921 /**
922  * Add commit callback function, it returns a non-zero value to inform
923  * caller to use sync transaction if necessary.
924  */
925 static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
926                                   struct obd_export *exp, __u64 transno)
927 {
928         struct tgt_last_committed_callback      *ccb;
929         struct dt_txn_commit_cb                 *dcb;
930         int                                      rc;
931
932         OBD_ALLOC_PTR(ccb);
933         if (ccb == NULL)
934                 return -ENOMEM;
935
936         ccb->llcc_tgt = tgt;
937         ccb->llcc_exp = class_export_cb_get(exp);
938         ccb->llcc_transno = transno;
939
940         dcb = &ccb->llcc_cb;
941         dcb->dcb_func = tgt_cb_last_committed;
942         INIT_LIST_HEAD(&dcb->dcb_linkage);
943         strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
944
945         rc = dt_trans_cb_add(th, dcb);
946         if (rc) {
947                 class_export_cb_put(exp);
948                 OBD_FREE_PTR(ccb);
949         }
950
951         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
952                 /* report failure to force synchronous operation */
953                 return -EPERM;
954
955         /* if exp_need_sync is set, return non-zero value to force
956          * a sync transaction. */
957         return rc ? rc : exp->exp_need_sync;
958 }
959
960 static int tgt_is_local_client(const struct lu_env *env,
961                                       struct obd_export *exp)
962 {
963         struct lu_target        *tgt = class_exp2tgt(exp);
964         struct tgt_session_info *tsi = tgt_ses_info(env);
965         struct ptlrpc_request   *req = tgt_ses_req(tsi);
966
967         if (exp_connect_flags(exp) & OBD_CONNECT_MDS ||
968             exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
969                 return 0;
970         if (tgt->lut_local_recovery)
971                 return 0;
972         if (!req)
973                 return 0;
974         if (!LNetIsPeerLocal(&req->rq_peer.nid))
975                 return 0;
976
977         return 1;
978 }
979
980 /**
981  * Add new client to the last_rcvd upon new connection.
982  *
983  * We use a bitmap to locate a free space in the last_rcvd file and initialize
984  * tg_export_data.
985  */
986 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
987 {
988         struct tg_export_data   *ted = &exp->exp_target_data;
989         struct lu_target        *tgt = class_exp2tgt(exp);
990         int                      rc = 0, idx;
991
992         ENTRY;
993
994         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
995         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
996                 RETURN(0);
997
998         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
999                 RETURN(0);
1000
1001         if (tgt_is_local_client(env, exp)) {
1002                 LCONSOLE_WARN("%s: local client %s w/o recovery\n",
1003                               exp->exp_obd->obd_name, ted->ted_lcd->lcd_uuid);
1004                 exp->exp_no_recovery = 1;
1005                 RETURN(0);
1006         }
1007
1008         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
1009          * there's no need for extra complication here
1010          */
1011         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
1012 repeat:
1013         if (idx >= LR_MAX_CLIENTS ||
1014             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
1015                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
1016                        tgt->lut_obd->obd_name,  idx);
1017                 RETURN(-EOVERFLOW);
1018         }
1019         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1020                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
1021                                              LR_MAX_CLIENTS, idx);
1022                 goto repeat;
1023         }
1024
1025         ted->ted_lr_idx = idx;
1026         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1027                           idx * tgt->lut_lsd.lsd_client_size;
1028
1029         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1030
1031         if (tgt_is_multimodrpcs_client(exp)) {
1032                 /* Set MULTI RPCS incompatibility flag to prevent previous
1033                  * Lustre versions to mount a target with reply_data file */
1034                 if (!(tgt->lut_lsd.lsd_feature_incompat &
1035                       OBD_INCOMPAT_MULTI_RPCS)) {
1036                         tgt->lut_lsd.lsd_feature_incompat |=
1037                                                         OBD_INCOMPAT_MULTI_RPCS;
1038                         rc = tgt_server_data_update(env, tgt, 1);
1039                         if (rc < 0) {
1040                                 CERROR("%s: unable to set MULTI RPCS "
1041                                        "incompatibility flag\n",
1042                                        exp->exp_obd->obd_name);
1043                                 RETURN(rc);
1044                         }
1045                 }
1046
1047                 /* assign client slot generation */
1048                 ted->ted_lcd->lcd_generation =
1049                                 atomic_inc_return(&tgt->lut_client_generation);
1050         } else {
1051                 ted->ted_lcd->lcd_generation = 0;
1052         }
1053
1054         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' "
1055                "generation %d\n",
1056                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1057                ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
1058
1059         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
1060                 RETURN(-ENOSPC);
1061
1062         rc = tgt_client_data_update(env, exp);
1063         if (rc) {
1064                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
1065                        tgt->lut_obd->obd_name, idx, rc);
1066                 RETURN(rc);
1067         }
1068
1069         if (tgt_is_multimodrpcs_client(exp))
1070                 atomic_inc(&tgt->lut_num_clients);
1071
1072         RETURN(0);
1073 }
1074 EXPORT_SYMBOL(tgt_client_new);
1075
1076 /* Add an existing client to the MDS in-memory state based on
1077  * a client that was previously found in the last_rcvd file and
1078  * already has an assigned slot (idx >= 0).
1079  *
1080  * It should not be possible to fail adding an existing client - otherwise
1081  * mdt_init_server_data() callsite needs to be fixed.
1082  */
1083 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
1084 {
1085         struct tg_export_data   *ted = &exp->exp_target_data;
1086         struct lu_target        *tgt = class_exp2tgt(exp);
1087
1088         ENTRY;
1089
1090         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1091         LASSERTF(idx >= 0, "%d\n", idx);
1092
1093         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
1094             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1095                 RETURN(0);
1096
1097         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1098                 CERROR("%s: client %d: bit already set in bitmap!!\n",
1099                        tgt->lut_obd->obd_name,  idx);
1100                 LBUG();
1101         }
1102
1103         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
1104                "generation %d\n",
1105                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid,
1106                ted->ted_lcd->lcd_generation);
1107
1108         ted->ted_lr_idx = idx;
1109         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1110                           idx * tgt->lut_lsd.lsd_client_size;
1111
1112         mutex_init(&ted->ted_lcd_lock);
1113
1114         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1115
1116         RETURN(0);
1117 }
1118
1119 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
1120 {
1121         struct tg_export_data   *ted = &exp->exp_target_data;
1122         struct lu_target        *tgt = class_exp2tgt(exp);
1123         int                      rc;
1124
1125         ENTRY;
1126
1127         LASSERT(ted->ted_lcd);
1128
1129         if (unlikely(tgt == NULL)) {
1130                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
1131                        class_exp2obd(exp)->obd_name);
1132                 RETURN(-EINVAL);
1133         }
1134
1135         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
1136         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
1137                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
1138             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT ||
1139             exp->exp_no_recovery)
1140                 RETURN(0);
1141
1142         /* Slot may be not yet assigned, use case is race between Client
1143          * reconnect and forced eviction */
1144         if (ted->ted_lr_idx < 0) {
1145                 CWARN("%s: client with UUID '%s' not in bitmap\n",
1146                       tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid);
1147                 RETURN(0);
1148         }
1149
1150         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
1151                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1152                ted->ted_lcd->lcd_uuid);
1153
1154         /* Clear the bit _after_ zeroing out the client so we don't
1155            race with filter_client_add and zero out new clients.*/
1156         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
1157                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
1158                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
1159                 LBUG();
1160         }
1161
1162         /* Do not erase record for recoverable client. */
1163         if (exp->exp_flags & OBD_OPT_FAILOVER)
1164                 RETURN(0);
1165
1166         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
1167                 RETURN(0);
1168
1169         /* Make sure the server's last_transno is up to date.
1170          * This should be done before zeroing client slot so last_transno will
1171          * be in server data or in client data in case of failure */
1172         rc = tgt_server_data_update(env, tgt, 0);
1173         if (rc != 0) {
1174                 CERROR("%s: failed to update server data, skip client %s "
1175                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
1176                        ted->ted_lcd->lcd_uuid, rc);
1177                 RETURN(rc);
1178         }
1179
1180         /* Race between an eviction and a disconnection ?*/
1181         mutex_lock(&ted->ted_lcd_lock);
1182         if (ted->ted_lcd->lcd_uuid[0] == '\0') {
1183                 mutex_unlock(&ted->ted_lcd_lock);
1184                 RETURN(rc);
1185         }
1186
1187         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
1188         mutex_unlock(&ted->ted_lcd_lock);
1189
1190         rc = tgt_client_data_update(env, exp);
1191
1192         if (!rc && tgt_is_multimodrpcs_record(tgt, ted->ted_lcd))
1193                 atomic_dec(&tgt->lut_num_clients);
1194
1195         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
1196                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
1197                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
1198                ted->ted_lr_idx, ted->ted_lr_off, rc);
1199         RETURN(rc);
1200 }
1201 EXPORT_SYMBOL(tgt_client_del);
1202
1203 static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
1204 {
1205         struct tg_export_data   *ted = &exp->exp_target_data;
1206         struct lu_target        *lut = class_exp2tgt(exp);
1207         struct tg_reply_data    *trd, *tmp;
1208
1209         if (tag == 0)
1210                 return;
1211
1212         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
1213                 if (trd->trd_tag != tag)
1214                         continue;
1215
1216                 LASSERT(ergo(tgt_is_increasing_xid_client(exp),
1217                              trd->trd_reply.lrd_xid <= xid));
1218
1219                 ted->ted_release_tag++;
1220                 tgt_release_reply_data(lut, ted, trd);
1221         }
1222 }
1223
1224 static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
1225                        struct tg_export_data *ted, struct tg_reply_data *trd,
1226                        struct ptlrpc_request *req,
1227                        struct thandle *th, bool update_lrd_file)
1228 {
1229         struct tgt_session_info *tsi = NULL;
1230         struct lsd_reply_data *lrd;
1231         int i = -1;
1232         int rc;
1233
1234         lrd = &trd->trd_reply;
1235         /* update export last transno */
1236         mutex_lock(&ted->ted_lcd_lock);
1237         if (lrd->lrd_transno > ted->ted_lcd->lcd_last_transno)
1238                 ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
1239         mutex_unlock(&ted->ted_lcd_lock);
1240
1241         if (!tgt) {
1242                 trd->trd_index = TRD_INDEX_MEMORY;
1243                 GOTO(add_reply_data, rc = 0);
1244         }
1245
1246         if (env) {
1247                 tsi = tgt_ses_info(env);
1248                 if (tsi->tsi_batch_trd) {
1249                         LASSERT(tsi->tsi_batch_env);
1250                         trd = tsi->tsi_batch_trd;
1251                         i = trd->trd_index;
1252                 }
1253         }
1254
1255         if (i == -1) {
1256                 /* find a empty slot */
1257                 i = tgt_find_free_reply_slot(tgt);
1258                 if (unlikely(i < 0)) {
1259                         CERROR("%s: couldn't find a slot for reply data: rc = %d\n",
1260                                tgt_name(tgt), i);
1261                         RETURN(i);
1262                 }
1263                 trd->trd_index = i;
1264         }
1265
1266         if (update_lrd_file) {
1267                 loff_t  off;
1268
1269                 /* write reply data to disk */
1270                 off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
1271                 rc = tgt_reply_data_write(env, tgt, lrd, off, th);
1272                 if (unlikely(rc != 0)) {
1273                         CERROR("%s: can't update %s file: rc = %d\n",
1274                                tgt_name(tgt), REPLY_DATA, rc);
1275                         GOTO(free_slot, rc);
1276                 }
1277         }
1278
1279 add_reply_data:
1280         /* add reply data to target export's reply list */
1281         mutex_lock(&ted->ted_lcd_lock);
1282         if (req != NULL) {
1283                 int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
1284                               MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
1285
1286                 if (req->rq_obsolete) {
1287                         CDEBUG(D_INFO,
1288                                "drop reply data update for obsolete req xid=%llu,"
1289                                "transno=%llu, tag=%hu\n", req->rq_xid,
1290                                lrd->lrd_transno, trd->trd_tag);
1291                         mutex_unlock(&ted->ted_lcd_lock);
1292                         GOTO(free_slot, rc = -EBADR);
1293                 }
1294
1295                 if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude) &&
1296                     !(tsi && tsi->tsi_batch_env &&
1297                       trd->trd_reply.lrd_batch_idx > 0))
1298                         tgt_clean_by_tag(req->rq_export, req->rq_xid,
1299                                          trd->trd_tag);
1300         }
1301
1302         /*
1303          * For the batched RPC, all sub requests use one common @trd for the
1304          * reply data.
1305          */
1306         if (list_empty(&trd->trd_list)) {
1307                 list_add(&trd->trd_list, &ted->ted_reply_list);
1308                 ted->ted_reply_cnt++;
1309                 if (ted->ted_reply_cnt > ted->ted_reply_max)
1310                         ted->ted_reply_max = ted->ted_reply_cnt;
1311         }
1312         mutex_unlock(&ted->ted_lcd_lock);
1313
1314         CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
1315                "tag %hu, client gen %u, slot idx %d\n",
1316                trd, lrd->lrd_xid, lrd->lrd_transno,
1317                trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
1318
1319         RETURN(0);
1320
1321 free_slot:
1322         if (tgt != NULL)
1323                 tgt_clear_reply_slot(tgt, trd->trd_index);
1324         return rc;
1325 }
1326
1327 int tgt_mk_reply_data(const struct lu_env *env,
1328                       struct lu_target *tgt,
1329                       struct tg_export_data *ted,
1330                       struct ptlrpc_request *req,
1331                       __u64 opdata,
1332                       struct thandle *th,
1333                       bool write_update,
1334                       __u64 transno)
1335 {
1336         struct tg_reply_data *trd = NULL;
1337         struct lsd_reply_data *lrd;
1338         __u64 *pre_versions = NULL;
1339         struct tgt_session_info *tsi = NULL;
1340         int rc;
1341
1342         if (env != NULL) {
1343                 tsi = tgt_ses_info(env);
1344                 if (tsi->tsi_batch_trd) {
1345                         LASSERT(tsi->tsi_batch_env);
1346                         trd = tsi->tsi_batch_trd;
1347                 }
1348         }
1349
1350         if (trd == NULL) {
1351                 OBD_ALLOC_PTR(trd);
1352                 if (unlikely(trd == NULL))
1353                         RETURN(-ENOMEM);
1354
1355                 INIT_LIST_HEAD(&trd->trd_list);
1356         }
1357
1358         /* fill reply data information */
1359         lrd = &trd->trd_reply;
1360         lrd->lrd_transno = transno;
1361         if (tsi && tsi->tsi_batch_env) {
1362                 if (tsi->tsi_batch_idx == 0) {
1363                         LASSERT(req != NULL);
1364                         tsi->tsi_batch_trd = trd;
1365                         trd->trd_index = -1;
1366                         lrd->lrd_xid = req->rq_xid;
1367                         trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1368                         lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1369                 }
1370                 lrd->lrd_batch_idx = tsi->tsi_batch_idx;
1371         } else if (req != NULL) {
1372                 lrd->lrd_xid = req->rq_xid;
1373                 trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1374                 lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1375                 if (write_update) {
1376                         pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1377                         lrd->lrd_result = th->th_result;
1378                 }
1379         } else {
1380                 LASSERT(env != NULL);
1381                 LASSERT(tsi->tsi_xid != 0);
1382
1383                 lrd->lrd_xid = tsi->tsi_xid;
1384                 lrd->lrd_result = tsi->tsi_result;
1385                 lrd->lrd_client_gen = tsi->tsi_client_gen;
1386         }
1387
1388         lrd->lrd_data = opdata;
1389         if (pre_versions) {
1390                 trd->trd_pre_versions[0] = pre_versions[0];
1391                 trd->trd_pre_versions[1] = pre_versions[1];
1392                 trd->trd_pre_versions[2] = pre_versions[2];
1393                 trd->trd_pre_versions[3] = pre_versions[3];
1394         }
1395
1396         if (tsi && tsi->tsi_open_obj)
1397                 trd->trd_object = *lu_object_fid(&tsi->tsi_open_obj->do_lu);
1398
1399         rc = tgt_add_reply_data(env, tgt, ted, trd, req,
1400                                 th, write_update);
1401         if (rc < 0) {
1402                 OBD_FREE_PTR(trd);
1403                 if (rc == -EBADR)
1404                         rc = 0;
1405         }
1406         return rc;
1407
1408 }
1409 EXPORT_SYMBOL(tgt_mk_reply_data);
1410
1411 /*
1412  * last_rcvd & last_committed update callbacks
1413  */
1414 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
1415                                 struct dt_object *obj, __u64 opdata,
1416                                 struct thandle *th, struct ptlrpc_request *req)
1417 {
1418         struct tgt_thread_info  *tti = tgt_th_info(env);
1419         struct tgt_session_info *tsi = tgt_ses_info(env);
1420         struct obd_export *exp = tsi->tsi_exp;
1421         struct tg_export_data *ted;
1422         __u64 *transno_p;
1423         bool nolcd = false;
1424         int rc = 0;
1425
1426         ENTRY;
1427
1428
1429         LASSERT(exp != NULL);
1430         ted = &exp->exp_target_data;
1431
1432         /* Some clients don't support recovery, and they don't have last_rcvd
1433          * client data:
1434          * 1. lightweight clients.
1435          * 2. local clients on MDS which doesn't enable "localrecov".
1436          * 3. OFD connect may cause transaction before export has last_rcvd
1437          *    slot.
1438          */
1439         if (ted->ted_lr_idx < 0)
1440                 nolcd = true;
1441
1442         if (req != NULL)
1443                 tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1444         else
1445                 /* From update replay, tti_transno should be set already */
1446                 LASSERT(tti->tti_transno != 0);
1447
1448         spin_lock(&tgt->lut_translock);
1449         if (th->th_result != 0) {
1450                 if (tti->tti_transno != 0) {
1451                         CERROR("%s: replay transno %llu failed: rc = %d\n",
1452                                tgt_name(tgt), tti->tti_transno, th->th_result);
1453                 }
1454         } else if (tti->tti_transno == 0) {
1455                 tti->tti_transno = ++tgt->lut_last_transno;
1456         } else {
1457                 /* should be replay */
1458                 if (tti->tti_transno > tgt->lut_last_transno)
1459                         tgt->lut_last_transno = tti->tti_transno;
1460         }
1461         spin_unlock(&tgt->lut_translock);
1462
1463         /** VBR: set new versions */
1464         if (th->th_result == 0 && obj != NULL) {
1465                 struct dt_object *dto = dt_object_locate(obj, th->th_dev);
1466                 dt_version_set(env, dto, tti->tti_transno, th);
1467         }
1468
1469         /* filling reply data */
1470         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1471                tti->tti_transno, tgt->lut_obd->obd_last_committed);
1472
1473         if (req != NULL) {
1474                 req->rq_transno = tti->tti_transno;
1475                 lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
1476         }
1477
1478         /* if can't add callback, do sync write */
1479         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
1480
1481         if (nolcd) {
1482                 /* store transno in the last_rcvd header */
1483                 spin_lock(&tgt->lut_translock);
1484                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
1485                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
1486                         spin_unlock(&tgt->lut_translock);
1487                         /* Although current connection doesn't have slot
1488                          * in the last_rcvd, we still want to maintain
1489                          * the in-memory lsd_client_data structure in order to
1490                          * properly handle reply reconstruction. */
1491                         rc = tgt_server_data_write(env, tgt, th);
1492                 } else {
1493                         spin_unlock(&tgt->lut_translock);
1494                 }
1495         } else if (ted->ted_lr_off == 0) {
1496                 CERROR("%s: client idx %d has offset %lld\n",
1497                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
1498                 RETURN(-EINVAL);
1499         }
1500
1501         /* Target that supports multiple reply data */
1502         if (tgt_is_multimodrpcs_client(exp)) {
1503                 return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
1504                                          !!(req != NULL), tti->tti_transno);
1505         }
1506
1507         /* Enough for update replay, let's return */
1508         if (req == NULL)
1509                 RETURN(rc);
1510
1511         mutex_lock(&ted->ted_lcd_lock);
1512         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1513         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
1514                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
1515                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
1516                 ted->ted_lcd->lcd_last_close_result = th->th_result;
1517         } else {
1518                 /* VBR: save versions in last_rcvd for reconstruct. */
1519                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1520
1521                 if (pre_versions) {
1522                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
1523                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
1524                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
1525                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
1526                 }
1527                 transno_p = &ted->ted_lcd->lcd_last_transno;
1528                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
1529                 ted->ted_lcd->lcd_last_result = th->th_result;
1530                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
1531                  * see struct ldlm_reply->lock_policy_res1; */
1532                 ted->ted_lcd->lcd_last_data = opdata;
1533         }
1534
1535         /* Update transno in slot only if non-zero number, i.e. no errors */
1536         if (likely(tti->tti_transno != 0)) {
1537                 /* Don't overwrite bigger transaction number with lower one.
1538                  * That is not sign of problem in all cases, but in any case
1539                  * this value should be monotonically increased only. */
1540                 if (*transno_p > tti->tti_transno) {
1541                         if (!tgt->lut_no_reconstruct) {
1542                                 CERROR("%s: trying to overwrite bigger transno:"
1543                                        "on-disk: %llu, new: %llu replay: "
1544                                        "%d. See LU-617.\n", tgt_name(tgt),
1545                                        *transno_p, tti->tti_transno,
1546                                        req_is_replay(req));
1547                                 if (req_is_replay(req)) {
1548                                         spin_lock(&req->rq_export->exp_lock);
1549                                         req->rq_export->exp_vbr_failed = 1;
1550                                         spin_unlock(&req->rq_export->exp_lock);
1551                                 }
1552                                 mutex_unlock(&ted->ted_lcd_lock);
1553                                 RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
1554                         }
1555                 } else {
1556                         *transno_p = tti->tti_transno;
1557                 }
1558         }
1559
1560         if (!nolcd) {
1561                 tti->tti_off = ted->ted_lr_off;
1562                 if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
1563                         rc = -EIO;
1564                 else
1565                         rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
1566                                                    &tti->tti_off, th);
1567                 if (rc < 0) {
1568                         mutex_unlock(&ted->ted_lcd_lock);
1569                         RETURN(rc);
1570                 }
1571         }
1572         mutex_unlock(&ted->ted_lcd_lock);
1573         RETURN(rc);
1574 }
1575
1576 /*
1577  * last_rcvd update for echo client simulation.
1578  * It updates last_rcvd client slot and version of object in
1579  * simple way but with all locks to simulate all drawbacks
1580  */
1581 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
1582                                      struct lu_target *tgt,
1583                                      struct dt_object *obj,
1584                                      struct thandle *th,
1585                                      struct obd_export *exp)
1586 {
1587         struct tgt_thread_info  *tti = tgt_th_info(env);
1588         struct tg_export_data   *ted = &exp->exp_target_data;
1589         int                      rc = 0;
1590
1591         ENTRY;
1592
1593         tti->tti_transno = 0;
1594
1595         spin_lock(&tgt->lut_translock);
1596         if (th->th_result == 0)
1597                 tti->tti_transno = ++tgt->lut_last_transno;
1598         spin_unlock(&tgt->lut_translock);
1599
1600         /** VBR: set new versions */
1601         if (th->th_result == 0 && obj != NULL)
1602                 dt_version_set(env, obj, tti->tti_transno, th);
1603
1604         /* if can't add callback, do sync write */
1605         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
1606                                                 tti->tti_transno);
1607
1608         LASSERT(ted->ted_lr_off > 0);
1609
1610         mutex_lock(&ted->ted_lcd_lock);
1611         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1612         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
1613         ted->ted_lcd->lcd_last_result = th->th_result;
1614
1615         tti->tti_off = ted->ted_lr_off;
1616         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
1617         mutex_unlock(&ted->ted_lcd_lock);
1618         RETURN(rc);
1619 }
1620
1621 static int tgt_clients_data_init(const struct lu_env *env,
1622                                  struct lu_target *tgt,
1623                                  unsigned long last_size)
1624 {
1625         struct obd_device       *obd = tgt->lut_obd;
1626         struct lr_server_data   *lsd = &tgt->lut_lsd;
1627         struct lsd_client_data  *lcd = NULL;
1628         struct tg_export_data   *ted;
1629         int                      cl_idx;
1630         int                      rc = 0;
1631         loff_t                   off = lsd->lsd_client_start;
1632         __u32                    generation = 0;
1633         struct cfs_hash         *hash = NULL;
1634
1635         ENTRY;
1636
1637         if (tgt->lut_bottom->dd_rdonly)
1638                 RETURN(0);
1639
1640         BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
1641                      sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
1642
1643         OBD_ALLOC_PTR(lcd);
1644         if (lcd == NULL)
1645                 RETURN(-ENOMEM);
1646
1647         hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
1648         if (hash == NULL)
1649                 GOTO(err_out, rc = -ENODEV);
1650
1651         for (cl_idx = 0; off < last_size; cl_idx++) {
1652                 struct obd_export       *exp;
1653                 __u64                    last_transno;
1654
1655                 /* Don't assume off is incremented properly by
1656                  * read_record(), in case sizeof(*lcd)
1657                  * isn't the same as fsd->lsd_client_size.  */
1658                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
1659                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
1660                 if (rc) {
1661                         CERROR("%s: error reading last_rcvd %s idx %d off "
1662                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
1663                                cl_idx, off, rc);
1664                         rc = 0;
1665                         break; /* read error shouldn't cause startup to fail */
1666                 }
1667
1668                 if (lcd->lcd_uuid[0] == '\0') {
1669                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
1670                                cl_idx);
1671                         continue;
1672                 }
1673
1674                 last_transno = lcd_last_transno(lcd);
1675
1676                 /* These exports are cleaned up by disconnect, so they
1677                  * need to be set up like real exports as connect does.
1678                  */
1679                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
1680                        " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
1681                        cl_idx, last_transno, lsd->lsd_last_transno,
1682                        lcd_last_xid(lcd), lcd->lcd_generation);
1683
1684                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
1685                 if (IS_ERR(exp)) {
1686                         if (PTR_ERR(exp) == -EALREADY) {
1687                                 /* export already exists, zero out this one */
1688                                 CERROR("%s: Duplicate export %s!\n",
1689                                        tgt_name(tgt), lcd->lcd_uuid);
1690                                 continue;
1691                         }
1692                         GOTO(err_out, rc = PTR_ERR(exp));
1693                 }
1694
1695                 ted = &exp->exp_target_data;
1696                 *ted->ted_lcd = *lcd;
1697
1698                 rc = tgt_client_add(env, exp, cl_idx);
1699                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
1700                 /* VBR: set export last committed version */
1701                 exp->exp_last_committed = last_transno;
1702                 spin_lock(&exp->exp_lock);
1703                 exp->exp_connecting = 0;
1704                 exp->exp_in_recovery = 0;
1705                 spin_unlock(&exp->exp_lock);
1706                 atomic_inc(&obd->obd_max_recoverable_clients);
1707
1708                 if (tgt_is_multimodrpcs_record(tgt, lcd)) {
1709                         atomic_inc(&tgt->lut_num_clients);
1710
1711                         /* compute the highest valid client generation */
1712                         generation = max(generation, lcd->lcd_generation);
1713                         /* fill client_generation <-> export hash table */
1714                         rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
1715                                                  &exp->exp_gen_hash);
1716                         if (rc != 0) {
1717                                 CERROR("%s: duplicate export for client "
1718                                        "generation %u\n",
1719                                        tgt_name(tgt), lcd->lcd_generation);
1720                                 class_export_put(exp);
1721                                 GOTO(err_out, rc);
1722                         }
1723                 }
1724
1725                 class_export_put(exp);
1726
1727                 rc = rev_import_init(exp);
1728                 if (rc != 0) {
1729                         class_unlink_export(exp);
1730                         GOTO(err_out, rc);
1731                 }
1732
1733                 /* Need to check last_rcvd even for duplicated exports. */
1734                 CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
1735                        cl_idx, last_transno);
1736
1737                 spin_lock(&tgt->lut_translock);
1738                 tgt->lut_last_transno = max(last_transno,
1739                                             tgt->lut_last_transno);
1740                 spin_unlock(&tgt->lut_translock);
1741         }
1742
1743         /* record highest valid client generation */
1744         atomic_set(&tgt->lut_client_generation, generation);
1745
1746 err_out:
1747         if (hash != NULL)
1748                 cfs_hash_putref(hash);
1749         OBD_FREE_PTR(lcd);
1750         RETURN(rc);
1751 }
1752
1753 struct server_compat_data {
1754         __u32 rocompat;
1755         __u32 incompat;
1756         __u32 rocinit;
1757         __u32 incinit;
1758 };
1759
1760 static struct server_compat_data tgt_scd[] = {
1761         [LDD_F_SV_TYPE_MDT] = {
1762                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
1763                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1764                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
1765                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
1766                             OBD_INCOMPAT_MULTI_RPCS,
1767                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
1768                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1769                            OBD_INCOMPAT_MULTI_OI,
1770         },
1771         [LDD_F_SV_TYPE_OST] = {
1772                 .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
1773                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
1774                             OBD_INCOMPAT_FID,
1775                 .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
1776                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
1777         }
1778 };
1779
1780 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
1781 {
1782         struct tgt_thread_info          *tti = tgt_th_info(env);
1783         struct lr_server_data           *lsd = &tgt->lut_lsd;
1784         unsigned long                    last_rcvd_size;
1785         __u32                            index;
1786         int                              rc, type;
1787
1788         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
1789         if (rc)
1790                 RETURN(rc);
1791
1792         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1793
1794         /* ensure padding in the struct is the correct size */
1795         BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
1796                      sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
1797
1798         rc = server_name2index(tgt_name(tgt), &index, NULL);
1799         if (rc < 0) {
1800                 CERROR("%s: Can not get index from name: rc = %d\n",
1801                        tgt_name(tgt), rc);
1802                 RETURN(rc);
1803         }
1804         /* server_name2index() returns type */
1805         type = rc;
1806         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1807                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1808                 RETURN(-EINVAL);
1809         }
1810
1811         /* last_rcvd on OST doesn't provide reconstruct support because there
1812          * may be up to 8 in-flight write requests per single slot in
1813          * last_rcvd client data
1814          */
1815         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1816
1817         if (last_rcvd_size == 0) {
1818                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1819
1820                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1821                        sizeof(lsd->lsd_uuid));
1822                 lsd->lsd_last_transno = 0;
1823                 lsd->lsd_mount_count = 0;
1824                 lsd->lsd_server_size = LR_SERVER_SIZE;
1825                 lsd->lsd_client_start = LR_CLIENT_START;
1826                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1827                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1828                 lsd->lsd_osd_index = index;
1829                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1830                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1831         } else {
1832                 rc = tgt_server_data_read(env, tgt);
1833                 if (rc) {
1834                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1835                                tgt_name(tgt), rc);
1836                         RETURN(rc);
1837                 }
1838                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1839                         if (tgt->lut_bottom->dd_rdonly) {
1840                                 /* Such difference may be caused by mounting
1841                                  * up snapshot with new fsname under rd_only
1842                                  * mode. But even if it was NOT, it will not
1843                                  * damage the system because of "rd_only". */
1844                                 memcpy(lsd->lsd_uuid,
1845                                        tgt->lut_obd->obd_uuid.uuid,
1846                                        sizeof(lsd->lsd_uuid));
1847                         } else {
1848                                 LCONSOLE_ERROR_MSG(0x157, "Trying to start "
1849                                                    "OBD %s using the wrong "
1850                                                    "disk %s. Were the /dev/ "
1851                                                    "assignments rearranged?\n",
1852                                                    tgt->lut_obd->obd_uuid.uuid,
1853                                                    lsd->lsd_uuid);
1854                                 RETURN(-EINVAL);
1855                         }
1856                 }
1857
1858                 if (lsd->lsd_osd_index != index) {
1859                         LCONSOLE_ERROR_MSG(0x157,
1860                                            "%s: index %d in last rcvd is different with the index %d in config log, It might be disk corruption!\n",
1861                                            tgt_name(tgt),
1862                                            lsd->lsd_osd_index, index);
1863                         RETURN(-EINVAL);
1864                 }
1865         }
1866
1867         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1868                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1869                        tgt_name(tgt),
1870                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1871                 RETURN(-EINVAL);
1872         }
1873
1874         if (type == LDD_F_SV_TYPE_MDT)
1875                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1876
1877         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1878                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1879                        tgt_name(tgt),
1880                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1881                 RETURN(-EINVAL);
1882         }
1883         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1884         if (type == LDD_F_SV_TYPE_MDT &&
1885             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1886                 if (last_rcvd_size > lsd->lsd_client_start) {
1887                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1888                                       "remove all clients for interop needs\n",
1889                                       tgt_name(tgt));
1890                         rc = tgt_truncate_last_rcvd(env, tgt,
1891                                                     lsd->lsd_client_start);
1892                         if (rc)
1893                                 RETURN(rc);
1894                         last_rcvd_size = lsd->lsd_client_start;
1895                 }
1896                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1897                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1898         }
1899
1900         spin_lock(&tgt->lut_translock);
1901         tgt->lut_last_transno = lsd->lsd_last_transno;
1902         spin_unlock(&tgt->lut_translock);
1903
1904         lsd->lsd_mount_count++;
1905
1906         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1907         CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
1908                tgt_name(tgt), tgt->lut_last_transno);
1909         CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
1910                tgt_name(tgt), lsd->lsd_mount_count);
1911         CDEBUG(D_INODE, "%s: server data size: %u\n",
1912                tgt_name(tgt), lsd->lsd_server_size);
1913         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1914                tgt_name(tgt), lsd->lsd_client_start);
1915         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1916                tgt_name(tgt), lsd->lsd_client_size);
1917         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1918                tgt_name(tgt), last_rcvd_size);
1919         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1920                tgt_name(tgt), lsd->lsd_subdir_count);
1921         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1922                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1923                (last_rcvd_size - lsd->lsd_client_start) /
1924                 lsd->lsd_client_size);
1925         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1926
1927         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1928             lsd->lsd_client_size == 0) {
1929                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1930                 RETURN(-EINVAL);
1931         }
1932
1933         if (!tgt->lut_obd->obd_replayable)
1934                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1935
1936         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1937         if (rc < 0)
1938                 GOTO(err_client, rc);
1939
1940         spin_lock(&tgt->lut_translock);
1941         /* obd_last_committed is used for compatibility
1942          * with other lustre recovery code */
1943         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
1944         spin_unlock(&tgt->lut_translock);
1945
1946         obd2obt(tgt->lut_obd)->obt_mount_count = lsd->lsd_mount_count;
1947         obd2obt(tgt->lut_obd)->obt_instance = (__u32)lsd->lsd_mount_count;
1948
1949         /* save it, so mount count and last_transno is current */
1950         rc = tgt_server_data_update(env, tgt, 0);
1951         if (rc < 0)
1952                 GOTO(err_client, rc);
1953
1954         RETURN(0);
1955
1956 err_client:
1957         class_disconnect_exports(tgt->lut_obd);
1958         return rc;
1959 }
1960
1961 /* add credits for last_rcvd update */
1962 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
1963                      void *cookie)
1964 {
1965         struct lu_target        *tgt = cookie;
1966         struct tgt_session_info *tsi;
1967         struct tgt_thread_info  *tti = tgt_th_info(env);
1968         struct dt_object        *dto;
1969         int                      rc;
1970
1971         /* For readonly case, the caller should have got failure
1972          * when start the transaction. If the logic comes here,
1973          * there must be something wrong. */
1974         if (unlikely(tgt->lut_bottom->dd_rdonly)) {
1975                 dump_stack();
1976                 LBUG();
1977         }
1978
1979         /* if there is no session, then this transaction is not result of
1980          * request processing but some local operation */
1981         if (env->le_ses == NULL)
1982                 return 0;
1983
1984         LASSERT(tgt->lut_last_rcvd);
1985         tsi = tgt_ses_info(env);
1986         /* OFD may start transaction without export assigned */
1987         if (tsi->tsi_exp == NULL)
1988                 return 0;
1989
1990         if (tgt_is_multimodrpcs_client(tsi->tsi_exp)) {
1991                 /*
1992                  * Use maximum possible file offset for declaration to ensure
1993                  * ZFS will reserve enough credits for a write anywhere in this
1994                  * file, since we don't know where in the file the write will be
1995                  * because a replay slot has not been assigned.  This should be
1996                  * replaced by dmu_tx_hold_append() when available.
1997                  */
1998                 tti->tti_buf.lb_buf = NULL;
1999                 tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
2000                 dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
2001                 rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
2002                 if (rc)
2003                         return rc;
2004         } else {
2005                 dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
2006                 tti_buf_lcd(tti);
2007                 tti->tti_off = tsi->tsi_exp->exp_target_data.ted_lr_off;
2008                 rc = dt_declare_record_write(env, dto, &tti->tti_buf,
2009                                              tti->tti_off, th);
2010                 if (rc)
2011                         return rc;
2012         }
2013
2014         if (tsi->tsi_vbr_obj != NULL &&
2015             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2016                 dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
2017                 rc = dt_declare_version_set(env, dto, th);
2018         }
2019
2020         return rc;
2021 }
2022
2023 /* Update last_rcvd records with latests transaction data */
2024 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
2025                     void *cookie)
2026 {
2027         struct lu_target        *tgt = cookie;
2028         struct tgt_session_info *tsi;
2029         struct tgt_thread_info  *tti = tgt_th_info(env);
2030         struct dt_object        *obj = NULL;
2031         int                      rc;
2032         bool                     echo_client;
2033
2034         if (env->le_ses == NULL)
2035                 return 0;
2036
2037         tsi = tgt_ses_info(env);
2038         /* OFD may start transaction without export assigned */
2039         if (tsi->tsi_exp == NULL)
2040                 return 0;
2041
2042         echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
2043
2044         if (tsi->tsi_has_trans && !echo_client && !tsi->tsi_batch_env) {
2045                 if (!tsi->tsi_mult_trans) {
2046                         CDEBUG(D_HA, "More than one transaction %llu\n",
2047                                tti->tti_transno);
2048                         /**
2049                          * if RPC handler sees unexpected multiple last_rcvd
2050                          * updates with transno, then it is better to return
2051                          * the latest transaction number to the client.
2052                          * In that case replay may fail if part of operation
2053                          * was committed and can't be re-applied easily. But
2054                          * that is better than report the first transno, in
2055                          * which case partially committed operation would be
2056                          * considered as finished so never replayed causing
2057                          * data loss.
2058                          */
2059                 }
2060                 /* we need new transno to be assigned */
2061                 tti->tti_transno = 0;
2062         }
2063
2064         if (!th->th_result)
2065                 tsi->tsi_has_trans++;
2066
2067         if (tsi->tsi_vbr_obj != NULL &&
2068             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2069                 obj = tsi->tsi_vbr_obj;
2070         }
2071
2072         if (unlikely(echo_client)) /* echo client special case */
2073                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
2074                                                tsi->tsi_exp);
2075         else
2076                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
2077                                           tgt_ses_req(tsi));
2078         return rc;
2079 }
2080
2081 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
2082 {
2083         struct tgt_thread_info  *tti = tgt_th_info(env);
2084         struct lsd_reply_data   *lrd = &tti->tti_lrd;
2085         unsigned long            reply_data_size;
2086         int                      rc;
2087         struct lsd_reply_header *lrh = NULL;
2088         struct tg_reply_data    *trd = NULL;
2089         int                      idx;
2090         loff_t                   off;
2091         struct cfs_hash         *hash = NULL;
2092         struct obd_export       *exp;
2093         struct tg_export_data   *ted;
2094         int                      reply_data_recovered = 0;
2095
2096         rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr);
2097         if (rc)
2098                 GOTO(out, rc);
2099         reply_data_size = (unsigned long)tti->tti_attr.la_size;
2100
2101         OBD_ALLOC_PTR(lrh);
2102         if (lrh == NULL)
2103                 GOTO(out, rc = -ENOMEM);
2104
2105         if (reply_data_size == 0) {
2106                 CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
2107                        tgt_name(tgt));
2108                 lrh->lrh_magic = LRH_MAGIC;
2109                 lrh->lrh_header_size = sizeof(struct lsd_reply_header);
2110                 lrh->lrh_reply_size = sizeof(struct lsd_reply_data);
2111                 rc = tgt_reply_header_write(env, tgt, lrh);
2112                 if (rc) {
2113                         CERROR("%s: error writing %s: rc = %d\n",
2114                                tgt_name(tgt), REPLY_DATA, rc);
2115                         GOTO(out, rc);
2116                 }
2117         } else {
2118                 rc = tgt_reply_header_read(env, tgt, lrh);
2119                 if (rc) {
2120                         CERROR("%s: error reading %s: rc = %d\n",
2121                                tgt_name(tgt), REPLY_DATA, rc);
2122                         GOTO(out, rc);
2123                 }
2124                 if (lrh->lrh_magic != LRH_MAGIC ||
2125                     lrh->lrh_header_size != sizeof(struct lsd_reply_header) ||
2126                     lrh->lrh_reply_size != sizeof(struct lsd_reply_data)) {
2127                         CERROR("%s: invalid header in %s\n",
2128                                tgt_name(tgt), REPLY_DATA);
2129                         GOTO(out, rc = -EINVAL);
2130                 }
2131
2132                 hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
2133                 if (hash == NULL)
2134                         GOTO(out, rc = -ENODEV);
2135
2136                 OBD_ALLOC_PTR(trd);
2137                 if (trd == NULL)
2138                         GOTO(out, rc = -ENOMEM);
2139
2140                 /* Load reply_data from disk */
2141                 for (idx = 0, off = sizeof(struct lsd_reply_header);
2142                      off < reply_data_size;
2143                      idx++, off += sizeof(struct lsd_reply_data)) {
2144                         rc = tgt_reply_data_read(env, tgt, lrd, off);
2145                         if (rc) {
2146                                 CERROR("%s: error reading %s: rc = %d\n",
2147                                        tgt_name(tgt), REPLY_DATA, rc);
2148                                 GOTO(out, rc);
2149                         }
2150
2151                         exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
2152                         if (exp == NULL) {
2153                                 /* old reply data from a disconnected client */
2154                                 continue;
2155                         }
2156                         ted = &exp->exp_target_data;
2157                         mutex_lock(&ted->ted_lcd_lock);
2158
2159                         /* create in-memory reply_data and link it to
2160                          * target export's reply list */
2161                         rc = tgt_set_reply_slot(tgt, idx);
2162                         if (rc != 0) {
2163                                 mutex_unlock(&ted->ted_lcd_lock);
2164                                 GOTO(out, rc);
2165                         }
2166                         trd->trd_reply = *lrd;
2167                         trd->trd_pre_versions[0] = 0;
2168                         trd->trd_pre_versions[1] = 0;
2169                         trd->trd_pre_versions[2] = 0;
2170                         trd->trd_pre_versions[3] = 0;
2171                         trd->trd_index = idx;
2172                         trd->trd_tag = 0;
2173                         fid_zero(&trd->trd_object);
2174                         list_add(&trd->trd_list, &ted->ted_reply_list);
2175                         ted->ted_reply_cnt++;
2176                         if (ted->ted_reply_cnt > ted->ted_reply_max)
2177                                 ted->ted_reply_max = ted->ted_reply_cnt;
2178
2179                         CDEBUG(D_HA, "%s: restore reply %p: xid %llu, "
2180                                "transno %llu, client gen %u, slot idx %d\n",
2181                                tgt_name(tgt), trd, lrd->lrd_xid,
2182                                lrd->lrd_transno, lrd->lrd_client_gen,
2183                                trd->trd_index);
2184
2185                         /* update export last committed transation */
2186                         exp->exp_last_committed = max(exp->exp_last_committed,
2187                                                       lrd->lrd_transno);
2188                         /* Update lcd_last_transno as well for check in
2189                          * tgt_release_reply_data() or the latest client
2190                          * transno can be lost.
2191                          */
2192                         ted->ted_lcd->lcd_last_transno =
2193                                 max(ted->ted_lcd->lcd_last_transno,
2194                                     exp->exp_last_committed);
2195
2196                         mutex_unlock(&ted->ted_lcd_lock);
2197                         class_export_put(exp);
2198
2199                         /* update target last committed transaction */
2200                         spin_lock(&tgt->lut_translock);
2201                         tgt->lut_last_transno = max(tgt->lut_last_transno,
2202                                                     lrd->lrd_transno);
2203                         spin_unlock(&tgt->lut_translock);
2204
2205                         reply_data_recovered++;
2206
2207                         OBD_ALLOC_PTR(trd);
2208                         if (trd == NULL)
2209                                 GOTO(out, rc = -ENOMEM);
2210                 }
2211                 CDEBUG(D_INFO, "%s: %d reply data have been recovered\n",
2212                        tgt_name(tgt), reply_data_recovered);
2213         }
2214
2215         spin_lock(&tgt->lut_translock);
2216         /* obd_last_committed is used for compatibility
2217          * with other lustre recovery code */
2218         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
2219         spin_unlock(&tgt->lut_translock);
2220
2221         rc = 0;
2222
2223 out:
2224         if (hash != NULL)
2225                 cfs_hash_putref(hash);
2226         if (trd != NULL)
2227                 OBD_FREE_PTR(trd);
2228         if (lrh != NULL)
2229                 OBD_FREE_PTR(lrh);
2230         return rc;
2231 }
2232
2233 static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
2234                                 struct tg_reply_data *trd)
2235 {
2236         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2237         struct lu_target *lut = class_exp2tgt(req->rq_export);
2238         __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
2239         int rc = 0;
2240         struct tg_reply_data *reply;
2241         bool check_increasing;
2242
2243         if (tag == 0)
2244                 return 0;
2245
2246         check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
2247                            !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2248         if (!lookup && !check_increasing)
2249                 return 0;
2250
2251         list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
2252                 if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
2253                         rc = 1;
2254                         if (trd != NULL)
2255                                 *trd = *reply;
2256                         break;
2257                 } else if (check_increasing && reply->trd_tag == tag &&
2258                            reply->trd_reply.lrd_xid > req->rq_xid) {
2259                         rc = -EPROTO;
2260                         CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
2261                                tgt_name(lut), tag, req->rq_xid, trd,
2262                                reply->trd_reply.lrd_xid,
2263                                reply->trd_reply.lrd_transno,
2264                                reply->trd_reply.lrd_client_gen,
2265                                reply->trd_index, rc);
2266                         break;
2267                 }
2268         }
2269
2270         return rc;
2271 }
2272
2273 /* Look for a reply data matching specified request @req
2274  * A copy is returned in @trd if the pointer is not NULL
2275  */
2276 int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
2277 {
2278         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2279         int found = 0;
2280         bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2281
2282         mutex_lock(&ted->ted_lcd_lock);
2283         if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
2284                 /* A check for the last_xid is needed here in case there is
2285                  * no reply data is left in the list. It may happen if another
2286                  * RPC on another slot increased the last_xid between our
2287                  * process_req_last_xid & tgt_lookup_reply calls */
2288                 found = -EPROTO;
2289         } else {
2290                 found = tgt_check_lookup_req(req, 1, trd);
2291         }
2292         mutex_unlock(&ted->ted_lcd_lock);
2293
2294         CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
2295                tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
2296                req->rq_export->exp_last_xid);
2297
2298         return found;
2299 }
2300 EXPORT_SYMBOL(tgt_lookup_reply);
2301
2302 int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
2303 {
2304         struct tg_export_data   *ted = &exp->exp_target_data;
2305         struct lu_target        *lut = class_exp2tgt(exp);
2306         struct tg_reply_data    *trd, *tmp;
2307
2308
2309         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
2310                 if (trd->trd_reply.lrd_xid > rcvd_xid)
2311                         continue;
2312                 ted->ted_release_xid++;
2313                 tgt_release_reply_data(lut, ted, trd);
2314         }
2315
2316         return 0;
2317 }
2318
2319 int tgt_handle_tag(struct ptlrpc_request *req)
2320 {
2321         return tgt_check_lookup_req(req, 0, NULL);
2322 }
2323