Whamcloud - gitweb
b0f200c39dec483fff251542fbbf12192fdbcf58
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Lustre Unified Target
32  * These are common function to work with last_received file
33  *
34  * Author: Mikhail Pershin <mike.pershin@intel.com>
35  */
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_fid.h>
39
40 #include "tgt_internal.h"
41
42 /** version recovery epoch */
43 #define LR_EPOCH_BITS   32
44
45 /* Allocate a bitmap for a chunk of reply data slots */
46 static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
47 {
48         unsigned long *bm;
49
50         OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
51                         sizeof(long));
52         if (bm == NULL)
53                 return -ENOMEM;
54
55         spin_lock(&lut->lut_client_bitmap_lock);
56
57         if (lut->lut_reply_bitmap[chunk] != NULL) {
58                 /* someone else already allocated the bitmap for this chunk */
59                 spin_unlock(&lut->lut_client_bitmap_lock);
60                 OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
61                          sizeof(long));
62                 return 0;
63         }
64
65         lut->lut_reply_bitmap[chunk] = bm;
66
67         spin_unlock(&lut->lut_client_bitmap_lock);
68
69         return 0;
70 }
71
72 /* Look for an available reply data slot in the bitmap
73  * of the target @lut
74  * Allocate bitmap chunk when first used
75  * XXX algo could be improved if this routine limits performance
76  */
77 static int tgt_find_free_reply_slot(struct lu_target *lut)
78 {
79         unsigned long *bmp;
80         int chunk = 0;
81         int rc;
82         int b;
83
84         for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
85                 /* allocate the bitmap chunk if necessary */
86                 if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
87                         rc = tgt_bitmap_chunk_alloc(lut, chunk);
88                         if (rc != 0)
89                                 return rc;
90                 }
91                 bmp = lut->lut_reply_bitmap[chunk];
92
93                 /* look for an available slot in this chunk */
94                 do {
95                         b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
96                         if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
97                                 break;
98
99                         /* found one */
100                         if (test_and_set_bit(b, bmp) == 0)
101                                 return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
102                 } while (true);
103         }
104
105         return -ENOSPC;
106 }
107
108 /* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
109  * of the target @lut
110  * Allocate the bitmap chunk if necessary
111  */
112 static int tgt_set_reply_slot(struct lu_target *lut, int idx)
113 {
114         int chunk;
115         int b;
116         int rc;
117
118         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
119         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
120
121         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
122         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
123
124         /* allocate the bitmap chunk if necessary */
125         if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
126                 rc = tgt_bitmap_chunk_alloc(lut, chunk);
127                 if (rc != 0)
128                         return rc;
129         }
130
131         /* mark the slot 'used' in this chunk */
132         if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
133                 CERROR("%s: slot %d already set in bitmap\n",
134                        tgt_name(lut), idx);
135                 return -EALREADY;
136         }
137
138         return 0;
139 }
140
141
142 /* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
143  * of the target @lut
144  */
145 static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
146 {
147         int chunk;
148         int b;
149
150         if (lut->lut_obd->obd_stopping)
151                 /*
152                  * in case of failover keep the bit set in order to
153                  * avoid overwriting slots in reply_data which might
154                  * be required by resent rpcs
155                  */
156                 return 0;
157         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
158         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
159
160         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
161         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
162
163         if (lut->lut_reply_bitmap[chunk] == NULL) {
164                 CERROR("%s: slot %d not allocated\n",
165                        tgt_name(lut), idx);
166                 return -ENOENT;
167         }
168
169         if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
170                 CERROR("%s: slot %d already clear in bitmap\n",
171                        tgt_name(lut), idx);
172                 return -EALREADY;
173         }
174
175         return 0;
176 }
177
178
179 /* Read header of reply_data file of target @tgt into structure @lrh */
180 static int tgt_reply_header_read(const struct lu_env *env,
181                                  struct lu_target *tgt,
182                                  struct lsd_reply_header *lrh)
183 {
184         int                      rc;
185         struct lsd_reply_header  buf;
186         struct tgt_thread_info  *tti = tgt_th_info(env);
187
188         tti->tti_off = 0;
189         tti->tti_buf.lb_buf = &buf;
190         tti->tti_buf.lb_len = sizeof(buf);
191
192         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
193                             &tti->tti_off);
194         if (rc != 0)
195                 return rc;
196
197         lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
198         lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
199         lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
200
201         CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
202                "header_size=%d reply_size=%d\n",
203                 tgt->lut_obd->obd_name, REPLY_DATA,
204                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
205
206         return 0;
207 }
208
209 /* Write header into replay_data file of target @tgt from structure @lrh */
210 static int tgt_reply_header_write(const struct lu_env *env,
211                                   struct lu_target *tgt,
212                                   struct lsd_reply_header *lrh)
213 {
214         int                      rc;
215         struct lsd_reply_header  buf;
216         struct tgt_thread_info  *tti = tgt_th_info(env);
217         struct thandle          *th;
218         struct dt_object        *dto;
219
220         CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
221                "header_size=%d reply_size=%d\n",
222                 tgt->lut_obd->obd_name, REPLY_DATA,
223                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
224
225         if (tgt->lut_bottom->dd_rdonly)
226                 RETURN(0);
227
228         buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
229         buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
230         buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
231
232         th = dt_trans_create(env, tgt->lut_bottom);
233         if (IS_ERR(th))
234                 return PTR_ERR(th);
235         th->th_sync = 1;
236
237         tti->tti_off = 0;
238         tti->tti_buf.lb_buf = &buf;
239         tti->tti_buf.lb_len = sizeof(buf);
240
241         rc = dt_declare_record_write(env, tgt->lut_reply_data,
242                                      &tti->tti_buf, tti->tti_off, th);
243         if (rc)
244                 GOTO(out, rc);
245
246         rc = dt_trans_start(env, tgt->lut_bottom, th);
247         if (rc)
248                 GOTO(out, rc);
249
250         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
251         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
252 out:
253         dt_trans_stop(env, tgt->lut_bottom, th);
254         return rc;
255 }
256
257 /* Write the reply data @lrd into reply_data file of target @tgt
258  * at offset @off
259  */
260 static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
261                                 struct lsd_reply_data *lrd, loff_t off,
262                                 struct thandle *th)
263 {
264         struct tgt_thread_info  *tti = tgt_th_info(env);
265         struct dt_object        *dto;
266         struct lsd_reply_data   *buf = &tti->tti_lrd;
267
268         lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
269
270         buf->lrd_transno         = cpu_to_le64(lrd->lrd_transno);
271         buf->lrd_xid             = cpu_to_le64(lrd->lrd_xid);
272         buf->lrd_data            = cpu_to_le64(lrd->lrd_data);
273         buf->lrd_result          = cpu_to_le32(lrd->lrd_result);
274         buf->lrd_client_gen      = cpu_to_le32(lrd->lrd_client_gen);
275
276         lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
277
278         tti->tti_off = off;
279         tti->tti_buf.lb_buf = buf;
280         tti->tti_buf.lb_len = sizeof(*buf);
281
282         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
283         return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
284 }
285
286 /* Read the reply data from reply_data file of target @tgt at offset @off
287  * into structure @lrd
288  */
289 static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
290                                struct lsd_reply_data *lrd, loff_t off)
291 {
292         int                      rc;
293         struct tgt_thread_info  *tti = tgt_th_info(env);
294         struct lsd_reply_data   *buf = &tti->tti_lrd;
295
296         tti->tti_off = off;
297         tti->tti_buf.lb_buf = buf;
298         tti->tti_buf.lb_len = sizeof(*buf);
299
300         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
301                             &tti->tti_off);
302         if (rc != 0)
303                 return rc;
304
305         lrd->lrd_transno         = le64_to_cpu(buf->lrd_transno);
306         lrd->lrd_xid             = le64_to_cpu(buf->lrd_xid);
307         lrd->lrd_data            = le64_to_cpu(buf->lrd_data);
308         lrd->lrd_result          = le32_to_cpu(buf->lrd_result);
309         lrd->lrd_client_gen      = le32_to_cpu(buf->lrd_client_gen);
310
311         return 0;
312 }
313
314
315 /* Free the in-memory reply data structure @trd and release
316  * the corresponding slot in the reply_data file of target @lut
317  * Called with ted_lcd_lock held
318  */
319 static void tgt_free_reply_data(struct lu_target *lut,
320                                 struct tg_export_data *ted,
321                                 struct tg_reply_data *trd)
322 {
323         CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
324                "client gen %u, slot idx %d\n",
325                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
326                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
327                trd->trd_index);
328
329         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
330
331         list_del(&trd->trd_list);
332         ted->ted_reply_cnt--;
333         if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
334                 tgt_clear_reply_slot(lut, trd->trd_index);
335         OBD_FREE_PTR(trd);
336 }
337
338 /* Release the reply data @trd from target @lut
339  * The reply data with the highest transno for this export
340  * is retained to ensure correctness of target recovery
341  * Called with ted_lcd_lock held
342  */
343 static void tgt_release_reply_data(struct lu_target *lut,
344                                    struct tg_export_data *ted,
345                                    struct tg_reply_data *trd)
346 {
347         CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
348                "client gen %u, slot idx %d\n",
349                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
350                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
351                trd->trd_index);
352
353         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
354
355         /* Do not free the reply data corresponding to the
356          * highest transno of this export.
357          * This ensures on-disk reply data is kept and
358          * last committed transno can be restored from disk in case
359          * of target recovery
360          */
361         if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
362                 /* free previous retained reply */
363                 if (ted->ted_reply_last != NULL)
364                         tgt_free_reply_data(lut, ted, ted->ted_reply_last);
365                 /* retain the reply */
366                 list_del_init(&trd->trd_list);
367                 ted->ted_reply_last = trd;
368         } else {
369                 tgt_free_reply_data(lut, ted, trd);
370         }
371 }
372
373 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
374 {
375         tti->tti_buf.lb_buf = &tti->tti_lsd;
376         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
377         return &tti->tti_buf;
378 }
379
380 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
381 {
382         tti->tti_buf.lb_buf = &tti->tti_lcd;
383         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
384         return &tti->tti_buf;
385 }
386
387 /**
388  * Allocate in-memory data for client slot related to export.
389  */
390 int tgt_client_alloc(struct obd_export *exp)
391 {
392         ENTRY;
393         LASSERT(exp != exp->exp_obd->obd_self_export);
394
395         spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
396         INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
397         spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
398         INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
399
400         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
401         if (exp->exp_target_data.ted_lcd == NULL)
402                 RETURN(-ENOMEM);
403         /* Mark that slot is not yet valid, 0 doesn't work here */
404         exp->exp_target_data.ted_lr_idx = -1;
405         INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list);
406         mutex_init(&exp->exp_target_data.ted_lcd_lock);
407         RETURN(0);
408 }
409 EXPORT_SYMBOL(tgt_client_alloc);
410
411 /**
412  * Free in-memory data for client slot related to export.
413  */
414 void tgt_client_free(struct obd_export *exp)
415 {
416         struct tg_export_data   *ted = &exp->exp_target_data;
417         struct lu_target        *lut = class_exp2tgt(exp);
418         struct tg_reply_data    *trd, *tmp;
419
420         LASSERT(exp != exp->exp_obd->obd_self_export);
421
422         tgt_fmd_cleanup(exp);
423
424         /* free reply data */
425         mutex_lock(&ted->ted_lcd_lock);
426         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
427                 tgt_release_reply_data(lut, ted, trd);
428         }
429         if (ted->ted_reply_last != NULL) {
430                 tgt_free_reply_data(lut, ted, ted->ted_reply_last);
431                 ted->ted_reply_last = NULL;
432         }
433         mutex_unlock(&ted->ted_lcd_lock);
434
435         if (!hlist_unhashed(&exp->exp_gen_hash))
436                 cfs_hash_del(exp->exp_obd->obd_gen_hash,
437                              &ted->ted_lcd->lcd_generation,
438                              &exp->exp_gen_hash);
439
440         OBD_FREE_PTR(ted->ted_lcd);
441         ted->ted_lcd = NULL;
442
443         /* Target may have been freed (see LU-7430)
444          * Slot may be not yet assigned */
445         if (exp->exp_obd->u.obt.obt_magic != OBT_MAGIC ||
446             ted->ted_lr_idx < 0)
447                 return;
448
449         /* Clear bit when lcd is freed */
450         LASSERT(lut && lut->lut_client_bitmap);
451         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
452                 CERROR("%s: client %u bit already clear in bitmap\n",
453                        exp->exp_obd->obd_name, ted->ted_lr_idx);
454                 LBUG();
455         }
456
457         if (tgt_is_multimodrpcs_client(exp) && !exp->exp_obd->obd_stopping)
458                 atomic_dec(&lut->lut_num_clients);
459 }
460 EXPORT_SYMBOL(tgt_client_free);
461
462 static inline void tgt_check_lcd(const char *obd_name, int index,
463                                  struct lsd_client_data *lcd)
464 {
465         size_t uuid_size = sizeof(lcd->lcd_uuid);
466
467         if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
468                 lcd->lcd_uuid[uuid_size - 1] = '\0';
469
470                 LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
471                                lcd->lcd_uuid, obd_name, index);
472         }
473 }
474
475 static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
476                                 struct lsd_client_data *lcd,
477                                 loff_t *off, int index)
478 {
479         struct tgt_thread_info  *tti = tgt_th_info(env);
480         int                      rc;
481
482         tti_buf_lcd(tti);
483         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
484         if (rc == 0) {
485                 tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
486                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
487                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
488                 lcd->lcd_last_close_result =
489                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
490         }
491
492         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = %llu"
493                ", last_xid = %llu, last_result = %u, last_data = %u, "
494                "last_close_transno = %llu, last_close_xid = %llu, "
495                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
496                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
497                lcd->lcd_last_result, lcd->lcd_last_data,
498                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
499                lcd->lcd_last_close_result, rc);
500         return rc;
501 }
502
503 static int tgt_client_data_write(const struct lu_env *env,
504                                  struct lu_target *tgt,
505                                  struct lsd_client_data *lcd,
506                                  loff_t *off, struct thandle *th)
507 {
508         struct tgt_thread_info *tti = tgt_th_info(env);
509         struct dt_object        *dto;
510
511         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
512         lcd->lcd_last_close_result =
513                 ptlrpc_status_hton(lcd->lcd_last_close_result);
514         lcd_cpu_to_le(lcd, &tti->tti_lcd);
515         tti_buf_lcd(tti);
516
517         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
518         return dt_record_write(env, dto, &tti->tti_buf, off, th);
519 }
520
521 struct tgt_new_client_callback {
522         struct dt_txn_commit_cb  lncc_cb;
523         struct obd_export       *lncc_exp;
524 };
525
526 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
527                               struct dt_txn_commit_cb *cb, int err)
528 {
529         struct tgt_new_client_callback *ccb;
530
531         ccb = container_of(cb, struct tgt_new_client_callback, lncc_cb);
532
533         LASSERT(ccb->lncc_exp->exp_obd);
534
535         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
536                ccb->lncc_exp->exp_obd->obd_name,
537                ccb->lncc_exp->exp_client_uuid.uuid);
538
539         spin_lock(&ccb->lncc_exp->exp_lock);
540
541         ccb->lncc_exp->exp_need_sync = 0;
542
543         spin_unlock(&ccb->lncc_exp->exp_lock);
544         class_export_cb_put(ccb->lncc_exp);
545
546         OBD_FREE_PTR(ccb);
547 }
548
549 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
550 {
551         struct tgt_new_client_callback  *ccb;
552         struct dt_txn_commit_cb         *dcb;
553         int                              rc;
554
555         OBD_ALLOC_PTR(ccb);
556         if (ccb == NULL)
557                 return -ENOMEM;
558
559         ccb->lncc_exp = class_export_cb_get(exp);
560
561         dcb = &ccb->lncc_cb;
562         dcb->dcb_func = tgt_cb_new_client;
563         INIT_LIST_HEAD(&dcb->dcb_linkage);
564         strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
565
566         rc = dt_trans_cb_add(th, dcb);
567         if (rc) {
568                 class_export_cb_put(exp);
569                 OBD_FREE_PTR(ccb);
570         }
571         return rc;
572 }
573
574 /**
575  * Update client data in last_rcvd
576  */
577 static int tgt_client_data_update(const struct lu_env *env,
578                                   struct obd_export *exp)
579 {
580         struct tg_export_data   *ted = &exp->exp_target_data;
581         struct lu_target        *tgt = class_exp2tgt(exp);
582         struct tgt_thread_info  *tti = tgt_th_info(env);
583         struct thandle          *th;
584         int                      rc = 0;
585
586         ENTRY;
587
588         if (unlikely(tgt == NULL)) {
589                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
590                           class_exp2obd(exp)->obd_name);
591                 RETURN(-EINVAL);
592         }
593
594         if (tgt->lut_bottom->dd_rdonly)
595                 RETURN(0);
596
597         th = dt_trans_create(env, tgt->lut_bottom);
598         if (IS_ERR(th))
599                 RETURN(PTR_ERR(th));
600
601         tti_buf_lcd(tti);
602         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
603                                      &tti->tti_buf,
604                                      ted->ted_lr_off, th);
605         if (rc)
606                 GOTO(out, rc);
607
608         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
609         if (rc)
610                 GOTO(out, rc);
611
612         mutex_lock(&ted->ted_lcd_lock);
613
614         /*
615          * Until this operations will be committed the sync is needed
616          * for this export. This should be done _after_ starting the
617          * transaction so that many connecting clients will not bring
618          * server down with lots of sync writes.
619          */
620         rc = tgt_new_client_cb_add(th, exp);
621         if (rc) {
622                 /* can't add callback, do sync now */
623                 th->th_sync = 1;
624         } else {
625                 spin_lock(&exp->exp_lock);
626                 exp->exp_need_sync = 1;
627                 spin_unlock(&exp->exp_lock);
628         }
629
630         tti->tti_off = ted->ted_lr_off;
631         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
632
633         mutex_unlock(&ted->ted_lcd_lock);
634
635         EXIT;
636 out:
637         dt_trans_stop(env, tgt->lut_bottom, th);
638         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
639                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
640                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
641
642         return rc;
643 }
644
645 static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
646 {
647         struct tgt_thread_info  *tti = tgt_th_info(env);
648         int                      rc;
649
650         tti->tti_off = 0;
651         tti_buf_lsd(tti);
652         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
653                             &tti->tti_off);
654         if (rc == 0)
655                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
656
657         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
658                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
659                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
660         return rc;
661 }
662
663 static int tgt_server_data_write(const struct lu_env *env,
664                                  struct lu_target *tgt, struct thandle *th)
665 {
666         struct tgt_thread_info  *tti = tgt_th_info(env);
667         struct dt_object        *dto;
668         int                      rc;
669
670         ENTRY;
671
672         tti->tti_off = 0;
673         tti_buf_lsd(tti);
674         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
675
676         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
677         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
678
679         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
680                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
681                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
682
683         RETURN(rc);
684 }
685
686 /**
687  * Update server data in last_rcvd
688  */
689 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
690                            int sync)
691 {
692         struct tgt_thread_info  *tti = tgt_th_info(env);
693         struct thandle          *th;
694         int                      rc = 0;
695
696         ENTRY;
697
698         CDEBUG(D_SUPER,
699                "%s: mount_count is %llu, last_transno is %llu\n",
700                tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
701                tgt->lut_last_transno);
702
703         /* Always save latest transno to keep it fresh */
704         spin_lock(&tgt->lut_translock);
705         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
706         spin_unlock(&tgt->lut_translock);
707
708         if (tgt->lut_bottom->dd_rdonly)
709                 RETURN(0);
710
711         th = dt_trans_create(env, tgt->lut_bottom);
712         if (IS_ERR(th))
713                 RETURN(PTR_ERR(th));
714
715         th->th_sync = sync;
716
717         tti_buf_lsd(tti);
718         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
719                                      &tti->tti_buf, tti->tti_off, th);
720         if (rc)
721                 GOTO(out, rc);
722
723         rc = dt_trans_start(env, tgt->lut_bottom, th);
724         if (rc)
725                 GOTO(out, rc);
726
727         rc = tgt_server_data_write(env, tgt, th);
728 out:
729         dt_trans_stop(env, tgt->lut_bottom, th);
730
731         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
732                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
733                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
734         RETURN(rc);
735 }
736 EXPORT_SYMBOL(tgt_server_data_update);
737
738 static int tgt_truncate_last_rcvd(const struct lu_env *env,
739                                   struct lu_target *tgt, loff_t size)
740 {
741         struct dt_object *dt = tgt->lut_last_rcvd;
742         struct thandle   *th;
743         struct lu_attr    attr;
744         int               rc;
745
746         ENTRY;
747
748         if (tgt->lut_bottom->dd_rdonly)
749                 RETURN(0);
750
751         attr.la_size = size;
752         attr.la_valid = LA_SIZE;
753
754         th = dt_trans_create(env, tgt->lut_bottom);
755         if (IS_ERR(th))
756                 RETURN(PTR_ERR(th));
757         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
758         if (rc)
759                 GOTO(cleanup, rc);
760         rc = dt_declare_attr_set(env, dt, &attr, th);
761         if (rc)
762                 GOTO(cleanup, rc);
763         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
764         if (rc)
765                 GOTO(cleanup, rc);
766
767         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
768         if (rc == 0)
769                 rc = dt_attr_set(env, dt, &attr, th);
770
771 cleanup:
772         dt_trans_stop(env, tgt->lut_bottom, th);
773
774         RETURN(rc);
775 }
776
777 static void tgt_client_epoch_update(const struct lu_env *env,
778                                     struct obd_export *exp)
779 {
780         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
781         struct lu_target        *tgt = class_exp2tgt(exp);
782
783         LASSERT(tgt && tgt->lut_bottom);
784         /** VBR: set client last_epoch to current epoch */
785         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
786                 return;
787         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
788         tgt_client_data_update(env, exp);
789 }
790
791 /**
792  * Update boot epoch when recovery ends
793  */
794 void tgt_boot_epoch_update(struct lu_target *tgt)
795 {
796         struct lu_env            env;
797         struct ptlrpc_request   *req;
798         __u32                    start_epoch;
799         LIST_HEAD(client_list);
800         int                      rc;
801
802         if (tgt->lut_obd->obd_stopping)
803                 return;
804
805         rc = lu_env_init(&env, LCT_DT_THREAD);
806         if (rc) {
807                 CERROR("%s: can't initialize environment: rc = %d\n",
808                         tgt->lut_obd->obd_name, rc);
809                 return;
810         }
811
812         spin_lock(&tgt->lut_translock);
813         start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
814         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
815         tgt->lut_lsd.lsd_start_epoch = start_epoch;
816         spin_unlock(&tgt->lut_translock);
817
818         /**
819          * The recovery is not yet finished and final queue can still be updated
820          * with resend requests. Move final list to separate one for processing
821          */
822         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
823         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
824         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
825
826         /**
827          * go through list of exports participated in recovery and
828          * set new epoch for them
829          */
830         list_for_each_entry(req, &client_list, rq_list) {
831                 LASSERT(!req->rq_export->exp_delayed);
832                 if (!req->rq_export->exp_vbr_failed)
833                         tgt_client_epoch_update(&env, req->rq_export);
834         }
835         /** return list back at once */
836         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
837         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
838         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
839
840         /** Clear MULTI RPCS incompatibility flag if
841          * - target is MDT and
842          * - there is no client to recover or the recovery was aborted
843          */
844         if (!strncmp(tgt->lut_obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) &&
845             (atomic_read(&tgt->lut_obd->obd_max_recoverable_clients) == 0 ||
846             tgt->lut_obd->obd_abort_recovery))
847                 tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
848
849         /** update server epoch */
850         tgt_server_data_update(&env, tgt, 1);
851         lu_env_fini(&env);
852 }
853
854 /**
855  * commit callback, need to update last_committed value
856  */
857 struct tgt_last_committed_callback {
858         struct dt_txn_commit_cb  llcc_cb;
859         struct lu_target        *llcc_tgt;
860         struct obd_export       *llcc_exp;
861         __u64                    llcc_transno;
862 };
863
864 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
865                                   struct dt_txn_commit_cb *cb, int err)
866 {
867         struct tgt_last_committed_callback *ccb;
868
869         ccb = container_of(cb, struct tgt_last_committed_callback, llcc_cb);
870
871         LASSERT(ccb->llcc_exp);
872         LASSERT(ccb->llcc_tgt != NULL);
873         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
874
875         if (th->th_reserved_quota.qrr_count > 0) {
876                 struct lu_env            temp_env;
877                 int rc;
878
879                 CDEBUG(D_QUOTA, "free quota %llu %llu\n",
880                        th->th_reserved_quota.qrr_id.qid_gid,
881                        th->th_reserved_quota.qrr_count);
882
883                 rc = lu_env_init(&temp_env, LCT_DT_THREAD);
884                 if (rc) {
885                         CERROR("%s: can't initialize environment: rc = %d\n",
886                                ccb->llcc_tgt->lut_obd->obd_name, rc);
887                         goto out;
888                 }
889
890                 dt_reserve_or_free_quota(&temp_env, th->th_dev,
891                                          th->th_reserved_quota.qrr_type,
892                                          th->th_reserved_quota.qrr_id.qid_uid,
893                                          th->th_reserved_quota.qrr_id.qid_gid,
894                                          -th->th_reserved_quota.qrr_count,
895                                          false);
896                 lu_env_fini(&temp_env);
897         }
898
899         /* error hit, don't update last committed to provide chance to
900          * replay data after fail */
901         if (err != 0)
902                 goto out;
903
904         /* Fast path w/o spinlock, if exp_last_committed was updated
905          * with higher transno, no need to take spinlock and check,
906          * also no need to update obd_last_committed. */
907         if (ccb->llcc_transno <= ccb->llcc_exp->exp_last_committed)
908                 goto out;
909         spin_lock(&ccb->llcc_tgt->lut_translock);
910         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
911                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
912
913         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
914                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
915                 spin_unlock(&ccb->llcc_tgt->lut_translock);
916
917                 ptlrpc_commit_replies(ccb->llcc_exp);
918                 tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
919         } else {
920                 spin_unlock(&ccb->llcc_tgt->lut_translock);
921         }
922
923         CDEBUG(D_HA, "%s: transno %lld is committed\n",
924                ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
925
926 out:
927         class_export_cb_put(ccb->llcc_exp);
928         OBD_FREE_PTR(ccb);
929 }
930
931 /**
932  * Add commit callback function, it returns a non-zero value to inform
933  * caller to use sync transaction if necessary.
934  */
935 static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
936                                   struct obd_export *exp, __u64 transno)
937 {
938         struct tgt_last_committed_callback      *ccb;
939         struct dt_txn_commit_cb                 *dcb;
940         int                                      rc;
941
942         OBD_ALLOC_PTR(ccb);
943         if (ccb == NULL)
944                 return -ENOMEM;
945
946         ccb->llcc_tgt = tgt;
947         ccb->llcc_exp = class_export_cb_get(exp);
948         ccb->llcc_transno = transno;
949
950         dcb = &ccb->llcc_cb;
951         dcb->dcb_func = tgt_cb_last_committed;
952         INIT_LIST_HEAD(&dcb->dcb_linkage);
953         strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
954
955         rc = dt_trans_cb_add(th, dcb);
956         if (rc) {
957                 class_export_cb_put(exp);
958                 OBD_FREE_PTR(ccb);
959         }
960
961         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
962                 /* report failure to force synchronous operation */
963                 return -EPERM;
964
965         /* if exp_need_sync is set, return non-zero value to force
966          * a sync transaction. */
967         return rc ? rc : exp->exp_need_sync;
968 }
969
970 static int tgt_is_local_client(const struct lu_env *env,
971                                       struct obd_export *exp)
972 {
973         struct lu_target        *tgt = class_exp2tgt(exp);
974         struct tgt_session_info *tsi = tgt_ses_info(env);
975         struct ptlrpc_request   *req = tgt_ses_req(tsi);
976         struct lnet_nid nid;
977
978         if (exp_connect_flags(exp) & OBD_CONNECT_MDS ||
979             exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
980                 return 0;
981         if (tgt->lut_local_recovery)
982                 return 0;
983         if (!req)
984                 return 0;
985         lnet_nid4_to_nid(req->rq_peer.nid, &nid);
986         if (!LNetIsPeerLocal(&nid))
987                 return 0;
988
989         return 1;
990 }
991
992 /**
993  * Add new client to the last_rcvd upon new connection.
994  *
995  * We use a bitmap to locate a free space in the last_rcvd file and initialize
996  * tg_export_data.
997  */
998 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
999 {
1000         struct tg_export_data   *ted = &exp->exp_target_data;
1001         struct lu_target        *tgt = class_exp2tgt(exp);
1002         int                      rc = 0, idx;
1003
1004         ENTRY;
1005
1006         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1007         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
1008                 RETURN(0);
1009
1010         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1011                 RETURN(0);
1012
1013         if (tgt_is_local_client(env, exp)) {
1014                 LCONSOLE_WARN("%s: local client %s w/o recovery\n",
1015                               exp->exp_obd->obd_name, ted->ted_lcd->lcd_uuid);
1016                 exp->exp_no_recovery = 1;
1017                 RETURN(0);
1018         }
1019
1020         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
1021          * there's no need for extra complication here
1022          */
1023         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
1024 repeat:
1025         if (idx >= LR_MAX_CLIENTS ||
1026             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
1027                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
1028                        tgt->lut_obd->obd_name,  idx);
1029                 RETURN(-EOVERFLOW);
1030         }
1031         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1032                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
1033                                              LR_MAX_CLIENTS, idx);
1034                 goto repeat;
1035         }
1036
1037         ted->ted_lr_idx = idx;
1038         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1039                           idx * tgt->lut_lsd.lsd_client_size;
1040
1041         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1042
1043         if (tgt_is_multimodrpcs_client(exp)) {
1044                 /* Set MULTI RPCS incompatibility flag to prevent previous
1045                  * Lustre versions to mount a target with reply_data file */
1046                 atomic_inc(&tgt->lut_num_clients);
1047                 if (!(tgt->lut_lsd.lsd_feature_incompat &
1048                       OBD_INCOMPAT_MULTI_RPCS)) {
1049                         tgt->lut_lsd.lsd_feature_incompat |=
1050                                                         OBD_INCOMPAT_MULTI_RPCS;
1051                         rc = tgt_server_data_update(env, tgt, 1);
1052                         if (rc < 0) {
1053                                 CERROR("%s: unable to set MULTI RPCS "
1054                                        "incompatibility flag\n",
1055                                        exp->exp_obd->obd_name);
1056                                 RETURN(rc);
1057                         }
1058                 }
1059
1060                 /* assign client slot generation */
1061                 ted->ted_lcd->lcd_generation =
1062                                 atomic_inc_return(&tgt->lut_client_generation);
1063         } else {
1064                 ted->ted_lcd->lcd_generation = 0;
1065         }
1066
1067         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' "
1068                "generation %d\n",
1069                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1070                ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
1071
1072         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
1073                 RETURN(-ENOSPC);
1074
1075         rc = tgt_client_data_update(env, exp);
1076         if (rc)
1077                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
1078                        tgt->lut_obd->obd_name, idx, rc);
1079
1080         RETURN(rc);
1081 }
1082 EXPORT_SYMBOL(tgt_client_new);
1083
1084 /* Add an existing client to the MDS in-memory state based on
1085  * a client that was previously found in the last_rcvd file and
1086  * already has an assigned slot (idx >= 0).
1087  *
1088  * It should not be possible to fail adding an existing client - otherwise
1089  * mdt_init_server_data() callsite needs to be fixed.
1090  */
1091 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
1092 {
1093         struct tg_export_data   *ted = &exp->exp_target_data;
1094         struct lu_target        *tgt = class_exp2tgt(exp);
1095
1096         ENTRY;
1097
1098         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1099         LASSERTF(idx >= 0, "%d\n", idx);
1100
1101         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
1102             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1103                 RETURN(0);
1104
1105         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1106                 CERROR("%s: client %d: bit already set in bitmap!!\n",
1107                        tgt->lut_obd->obd_name,  idx);
1108                 LBUG();
1109         }
1110         atomic_inc(&tgt->lut_num_clients);
1111
1112         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
1113                "generation %d\n",
1114                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid,
1115                ted->ted_lcd->lcd_generation);
1116
1117         ted->ted_lr_idx = idx;
1118         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1119                           idx * tgt->lut_lsd.lsd_client_size;
1120
1121         mutex_init(&ted->ted_lcd_lock);
1122
1123         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1124
1125         RETURN(0);
1126 }
1127
1128 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
1129 {
1130         struct tg_export_data   *ted = &exp->exp_target_data;
1131         struct lu_target        *tgt = class_exp2tgt(exp);
1132         int                      rc;
1133
1134         ENTRY;
1135
1136         LASSERT(ted->ted_lcd);
1137
1138         if (unlikely(tgt == NULL)) {
1139                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
1140                        class_exp2obd(exp)->obd_name);
1141                 RETURN(-EINVAL);
1142         }
1143
1144         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
1145         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
1146                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
1147             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT ||
1148             exp->exp_no_recovery)
1149                 RETURN(0);
1150
1151         /* Slot may be not yet assigned, use case is race between Client
1152          * reconnect and forced eviction */
1153         if (ted->ted_lr_idx < 0) {
1154                 CWARN("%s: client with UUID '%s' not in bitmap\n",
1155                       tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid);
1156                 RETURN(0);
1157         }
1158
1159         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
1160                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1161                ted->ted_lcd->lcd_uuid);
1162
1163         /* Clear the bit _after_ zeroing out the client so we don't
1164            race with filter_client_add and zero out new clients.*/
1165         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
1166                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
1167                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
1168                 LBUG();
1169         }
1170
1171         /* Do not erase record for recoverable client. */
1172         if (exp->exp_flags & OBD_OPT_FAILOVER)
1173                 RETURN(0);
1174
1175         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
1176                 RETURN(0);
1177
1178         /* Make sure the server's last_transno is up to date.
1179          * This should be done before zeroing client slot so last_transno will
1180          * be in server data or in client data in case of failure */
1181         rc = tgt_server_data_update(env, tgt, 0);
1182         if (rc != 0) {
1183                 CERROR("%s: failed to update server data, skip client %s "
1184                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
1185                        ted->ted_lcd->lcd_uuid, rc);
1186                 RETURN(rc);
1187         }
1188
1189         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
1190         rc = tgt_client_data_update(env, exp);
1191
1192         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
1193                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
1194                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
1195                ted->ted_lr_idx, ted->ted_lr_off, rc);
1196         RETURN(rc);
1197 }
1198 EXPORT_SYMBOL(tgt_client_del);
1199
1200 static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
1201 {
1202         struct tg_export_data   *ted = &exp->exp_target_data;
1203         struct lu_target        *lut = class_exp2tgt(exp);
1204         struct tg_reply_data    *trd, *tmp;
1205
1206         if (tag == 0)
1207                 return;
1208
1209         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
1210                 if (trd->trd_tag != tag)
1211                         continue;
1212
1213                 LASSERT(ergo(tgt_is_increasing_xid_client(exp),
1214                              trd->trd_reply.lrd_xid <= xid));
1215
1216                 ted->ted_release_tag++;
1217                 tgt_release_reply_data(lut, ted, trd);
1218         }
1219 }
1220
1221 static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
1222                        struct tg_export_data *ted, struct tg_reply_data *trd,
1223                        struct ptlrpc_request *req,
1224                        struct thandle *th, bool update_lrd_file)
1225 {
1226         struct lsd_reply_data   *lrd;
1227         int     i;
1228         int     rc;
1229
1230         lrd = &trd->trd_reply;
1231         /* update export last transno */
1232         mutex_lock(&ted->ted_lcd_lock);
1233         if (lrd->lrd_transno > ted->ted_lcd->lcd_last_transno)
1234                 ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
1235         mutex_unlock(&ted->ted_lcd_lock);
1236
1237         if (tgt != NULL) {
1238                 /* find a empty slot */
1239                 i = tgt_find_free_reply_slot(tgt);
1240                 if (unlikely(i < 0)) {
1241                         CERROR("%s: couldn't find a slot for reply data: "
1242                                "rc = %d\n", tgt_name(tgt), i);
1243                         RETURN(i);
1244                 }
1245                 trd->trd_index = i;
1246
1247                 if (update_lrd_file) {
1248                         loff_t  off;
1249
1250                         /* write reply data to disk */
1251                         off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
1252                         rc = tgt_reply_data_write(env, tgt, lrd, off, th);
1253                         if (unlikely(rc != 0)) {
1254                                 CERROR("%s: can't update %s file: rc = %d\n",
1255                                        tgt_name(tgt), REPLY_DATA, rc);
1256                                 GOTO(free_slot, rc);
1257                         }
1258                 }
1259         } else {
1260                 trd->trd_index = TRD_INDEX_MEMORY;
1261         }
1262
1263         /* add reply data to target export's reply list */
1264         mutex_lock(&ted->ted_lcd_lock);
1265         if (req != NULL) {
1266                 int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
1267                               MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
1268
1269                 if (req->rq_obsolete) {
1270                         CDEBUG(D_INFO,
1271                                "drop reply data update for obsolete req xid=%llu,"
1272                                "transno=%llu, tag=%hu\n", req->rq_xid,
1273                                lrd->lrd_transno, trd->trd_tag);
1274                         mutex_unlock(&ted->ted_lcd_lock);
1275                         GOTO(free_slot, rc = -EBADR);
1276                 }
1277
1278                 if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude))
1279                         tgt_clean_by_tag(req->rq_export, req->rq_xid,
1280                                          trd->trd_tag);
1281         }
1282         list_add(&trd->trd_list, &ted->ted_reply_list);
1283         ted->ted_reply_cnt++;
1284         if (ted->ted_reply_cnt > ted->ted_reply_max)
1285                 ted->ted_reply_max = ted->ted_reply_cnt;
1286         mutex_unlock(&ted->ted_lcd_lock);
1287
1288         CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
1289                "tag %hu, client gen %u, slot idx %d\n",
1290                trd, lrd->lrd_xid, lrd->lrd_transno,
1291                trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
1292
1293         RETURN(0);
1294
1295 free_slot:
1296         if (tgt != NULL)
1297                 tgt_clear_reply_slot(tgt, trd->trd_index);
1298         return rc;
1299 }
1300
1301 int tgt_mk_reply_data(const struct lu_env *env,
1302                       struct lu_target *tgt,
1303                       struct tg_export_data *ted,
1304                       struct ptlrpc_request *req,
1305                       __u64 opdata,
1306                       struct thandle *th,
1307                       bool write_update,
1308                       __u64 transno)
1309 {
1310         struct tg_reply_data    *trd;
1311         struct lsd_reply_data   *lrd;
1312         __u64                   *pre_versions = NULL;
1313         int                     rc;
1314         struct tgt_session_info *tsi = NULL;
1315
1316         OBD_ALLOC_PTR(trd);
1317         if (unlikely(trd == NULL))
1318                 RETURN(-ENOMEM);
1319
1320         if (env != NULL)
1321                 tsi = tgt_ses_info(env);
1322
1323         /* fill reply data information */
1324         lrd = &trd->trd_reply;
1325         lrd->lrd_transno = transno;
1326         if (req != NULL) {
1327                 lrd->lrd_xid = req->rq_xid;
1328                 trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1329                 lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1330                 if (write_update) {
1331                         pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1332                         lrd->lrd_result = th->th_result;
1333                 }
1334         } else {
1335                 LASSERT(env != NULL);
1336                 LASSERT(tsi->tsi_xid != 0);
1337
1338                 lrd->lrd_xid = tsi->tsi_xid;
1339                 lrd->lrd_result = tsi->tsi_result;
1340                 lrd->lrd_client_gen = tsi->tsi_client_gen;
1341         }
1342
1343         lrd->lrd_data = opdata;
1344         if (pre_versions) {
1345                 trd->trd_pre_versions[0] = pre_versions[0];
1346                 trd->trd_pre_versions[1] = pre_versions[1];
1347                 trd->trd_pre_versions[2] = pre_versions[2];
1348                 trd->trd_pre_versions[3] = pre_versions[3];
1349         }
1350
1351         if (tsi && tsi->tsi_open_obj)
1352                 trd->trd_object = *lu_object_fid(&tsi->tsi_open_obj->do_lu);
1353
1354         rc = tgt_add_reply_data(env, tgt, ted, trd, req,
1355                                 th, write_update);
1356         if (rc < 0) {
1357                 OBD_FREE_PTR(trd);
1358                 if (rc == -EBADR)
1359                         rc = 0;
1360         }
1361         return rc;
1362
1363 }
1364 EXPORT_SYMBOL(tgt_mk_reply_data);
1365
1366 /*
1367  * last_rcvd & last_committed update callbacks
1368  */
1369 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
1370                                 struct dt_object *obj, __u64 opdata,
1371                                 struct thandle *th, struct ptlrpc_request *req)
1372 {
1373         struct tgt_thread_info  *tti = tgt_th_info(env);
1374         struct tgt_session_info *tsi = tgt_ses_info(env);
1375         struct obd_export *exp = tsi->tsi_exp;
1376         struct tg_export_data *ted;
1377         __u64 *transno_p;
1378         bool nolcd = false;
1379         int rc = 0;
1380
1381         ENTRY;
1382
1383
1384         LASSERT(exp != NULL);
1385         ted = &exp->exp_target_data;
1386
1387         /* Some clients don't support recovery, and they don't have last_rcvd
1388          * client data:
1389          * 1. lightweight clients.
1390          * 2. local clients on MDS which doesn't enable "localrecov".
1391          * 3. OFD connect may cause transaction before export has last_rcvd
1392          *    slot.
1393          */
1394         if (ted->ted_lr_idx < 0)
1395                 nolcd = true;
1396
1397         if (req != NULL)
1398                 tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1399         else
1400                 /* From update replay, tti_transno should be set already */
1401                 LASSERT(tti->tti_transno != 0);
1402
1403         spin_lock(&tgt->lut_translock);
1404         if (th->th_result != 0) {
1405                 if (tti->tti_transno != 0) {
1406                         CERROR("%s: replay transno %llu failed: rc = %d\n",
1407                                tgt_name(tgt), tti->tti_transno, th->th_result);
1408                 }
1409         } else if (tti->tti_transno == 0) {
1410                 tti->tti_transno = ++tgt->lut_last_transno;
1411         } else {
1412                 /* should be replay */
1413                 if (tti->tti_transno > tgt->lut_last_transno)
1414                         tgt->lut_last_transno = tti->tti_transno;
1415         }
1416         spin_unlock(&tgt->lut_translock);
1417
1418         /** VBR: set new versions */
1419         if (th->th_result == 0 && obj != NULL) {
1420                 struct dt_object *dto = dt_object_locate(obj, th->th_dev);
1421                 dt_version_set(env, dto, tti->tti_transno, th);
1422         }
1423
1424         /* filling reply data */
1425         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1426                tti->tti_transno, tgt->lut_obd->obd_last_committed);
1427
1428         if (req != NULL) {
1429                 req->rq_transno = tti->tti_transno;
1430                 lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
1431         }
1432
1433         /* if can't add callback, do sync write */
1434         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
1435
1436         if (nolcd) {
1437                 /* store transno in the last_rcvd header */
1438                 spin_lock(&tgt->lut_translock);
1439                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
1440                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
1441                         spin_unlock(&tgt->lut_translock);
1442                         /* Although current connection doesn't have slot
1443                          * in the last_rcvd, we still want to maintain
1444                          * the in-memory lsd_client_data structure in order to
1445                          * properly handle reply reconstruction. */
1446                         rc = tgt_server_data_write(env, tgt, th);
1447                 } else {
1448                         spin_unlock(&tgt->lut_translock);
1449                 }
1450         } else if (ted->ted_lr_off == 0) {
1451                 CERROR("%s: client idx %d has offset %lld\n",
1452                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
1453                 RETURN(-EINVAL);
1454         }
1455
1456         /* Target that supports multiple reply data */
1457         if (tgt_is_multimodrpcs_client(exp)) {
1458                 return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
1459                                          !!(req != NULL), tti->tti_transno);
1460         }
1461
1462         /* Enough for update replay, let's return */
1463         if (req == NULL)
1464                 RETURN(rc);
1465
1466         mutex_lock(&ted->ted_lcd_lock);
1467         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1468         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
1469                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
1470                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
1471                 ted->ted_lcd->lcd_last_close_result = th->th_result;
1472         } else {
1473                 /* VBR: save versions in last_rcvd for reconstruct. */
1474                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1475
1476                 if (pre_versions) {
1477                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
1478                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
1479                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
1480                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
1481                 }
1482                 transno_p = &ted->ted_lcd->lcd_last_transno;
1483                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
1484                 ted->ted_lcd->lcd_last_result = th->th_result;
1485                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
1486                  * see struct ldlm_reply->lock_policy_res1; */
1487                 ted->ted_lcd->lcd_last_data = opdata;
1488         }
1489
1490         /* Update transno in slot only if non-zero number, i.e. no errors */
1491         if (likely(tti->tti_transno != 0)) {
1492                 /* Don't overwrite bigger transaction number with lower one.
1493                  * That is not sign of problem in all cases, but in any case
1494                  * this value should be monotonically increased only. */
1495                 if (*transno_p > tti->tti_transno) {
1496                         if (!tgt->lut_no_reconstruct) {
1497                                 CERROR("%s: trying to overwrite bigger transno:"
1498                                        "on-disk: %llu, new: %llu replay: "
1499                                        "%d. See LU-617.\n", tgt_name(tgt),
1500                                        *transno_p, tti->tti_transno,
1501                                        req_is_replay(req));
1502                                 if (req_is_replay(req)) {
1503                                         spin_lock(&req->rq_export->exp_lock);
1504                                         req->rq_export->exp_vbr_failed = 1;
1505                                         spin_unlock(&req->rq_export->exp_lock);
1506                                 }
1507                                 mutex_unlock(&ted->ted_lcd_lock);
1508                                 RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
1509                         }
1510                 } else {
1511                         *transno_p = tti->tti_transno;
1512                 }
1513         }
1514
1515         if (!nolcd) {
1516                 tti->tti_off = ted->ted_lr_off;
1517                 if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
1518                         rc = -EIO;
1519                 else
1520                         rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
1521                                                    &tti->tti_off, th);
1522                 if (rc < 0) {
1523                         mutex_unlock(&ted->ted_lcd_lock);
1524                         RETURN(rc);
1525                 }
1526         }
1527         mutex_unlock(&ted->ted_lcd_lock);
1528         RETURN(rc);
1529 }
1530
1531 /*
1532  * last_rcvd update for echo client simulation.
1533  * It updates last_rcvd client slot and version of object in
1534  * simple way but with all locks to simulate all drawbacks
1535  */
1536 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
1537                                      struct lu_target *tgt,
1538                                      struct dt_object *obj,
1539                                      struct thandle *th,
1540                                      struct obd_export *exp)
1541 {
1542         struct tgt_thread_info  *tti = tgt_th_info(env);
1543         struct tg_export_data   *ted = &exp->exp_target_data;
1544         int                      rc = 0;
1545
1546         ENTRY;
1547
1548         tti->tti_transno = 0;
1549
1550         spin_lock(&tgt->lut_translock);
1551         if (th->th_result == 0)
1552                 tti->tti_transno = ++tgt->lut_last_transno;
1553         spin_unlock(&tgt->lut_translock);
1554
1555         /** VBR: set new versions */
1556         if (th->th_result == 0 && obj != NULL)
1557                 dt_version_set(env, obj, tti->tti_transno, th);
1558
1559         /* if can't add callback, do sync write */
1560         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
1561                                                 tti->tti_transno);
1562
1563         LASSERT(ted->ted_lr_off > 0);
1564
1565         mutex_lock(&ted->ted_lcd_lock);
1566         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1567         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
1568         ted->ted_lcd->lcd_last_result = th->th_result;
1569
1570         tti->tti_off = ted->ted_lr_off;
1571         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
1572         mutex_unlock(&ted->ted_lcd_lock);
1573         RETURN(rc);
1574 }
1575
1576 static int tgt_clients_data_init(const struct lu_env *env,
1577                                  struct lu_target *tgt,
1578                                  unsigned long last_size)
1579 {
1580         struct obd_device       *obd = tgt->lut_obd;
1581         struct lr_server_data   *lsd = &tgt->lut_lsd;
1582         struct lsd_client_data  *lcd = NULL;
1583         struct tg_export_data   *ted;
1584         int                      cl_idx;
1585         int                      rc = 0;
1586         loff_t                   off = lsd->lsd_client_start;
1587         __u32                    generation = 0;
1588         struct cfs_hash         *hash = NULL;
1589
1590         ENTRY;
1591
1592         if (tgt->lut_bottom->dd_rdonly)
1593                 RETURN(0);
1594
1595         BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
1596                      sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
1597
1598         OBD_ALLOC_PTR(lcd);
1599         if (lcd == NULL)
1600                 RETURN(-ENOMEM);
1601
1602         hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
1603         if (hash == NULL)
1604                 GOTO(err_out, rc = -ENODEV);
1605
1606         for (cl_idx = 0; off < last_size; cl_idx++) {
1607                 struct obd_export       *exp;
1608                 __u64                    last_transno;
1609
1610                 /* Don't assume off is incremented properly by
1611                  * read_record(), in case sizeof(*lcd)
1612                  * isn't the same as fsd->lsd_client_size.  */
1613                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
1614                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
1615                 if (rc) {
1616                         CERROR("%s: error reading last_rcvd %s idx %d off "
1617                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
1618                                cl_idx, off, rc);
1619                         rc = 0;
1620                         break; /* read error shouldn't cause startup to fail */
1621                 }
1622
1623                 if (lcd->lcd_uuid[0] == '\0') {
1624                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
1625                                cl_idx);
1626                         continue;
1627                 }
1628
1629                 last_transno = lcd_last_transno(lcd);
1630
1631                 /* These exports are cleaned up by disconnect, so they
1632                  * need to be set up like real exports as connect does.
1633                  */
1634                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
1635                        " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
1636                        cl_idx, last_transno, lsd->lsd_last_transno,
1637                        lcd_last_xid(lcd), lcd->lcd_generation);
1638
1639                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
1640                 if (IS_ERR(exp)) {
1641                         if (PTR_ERR(exp) == -EALREADY) {
1642                                 /* export already exists, zero out this one */
1643                                 CERROR("%s: Duplicate export %s!\n",
1644                                        tgt_name(tgt), lcd->lcd_uuid);
1645                                 continue;
1646                         }
1647                         GOTO(err_out, rc = PTR_ERR(exp));
1648                 }
1649
1650                 ted = &exp->exp_target_data;
1651                 *ted->ted_lcd = *lcd;
1652
1653                 rc = tgt_client_add(env, exp, cl_idx);
1654                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
1655                 /* VBR: set export last committed version */
1656                 exp->exp_last_committed = last_transno;
1657                 spin_lock(&exp->exp_lock);
1658                 exp->exp_connecting = 0;
1659                 exp->exp_in_recovery = 0;
1660                 spin_unlock(&exp->exp_lock);
1661                 atomic_inc(&obd->obd_max_recoverable_clients);
1662
1663                 if (tgt->lut_lsd.lsd_feature_incompat &
1664                     OBD_INCOMPAT_MULTI_RPCS &&
1665                     lcd->lcd_generation != 0) {
1666                         /* compute the highest valid client generation */
1667                         generation = max(generation, lcd->lcd_generation);
1668                         /* fill client_generation <-> export hash table */
1669                         rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
1670                                                  &exp->exp_gen_hash);
1671                         if (rc != 0) {
1672                                 CERROR("%s: duplicate export for client "
1673                                        "generation %u\n",
1674                                        tgt_name(tgt), lcd->lcd_generation);
1675                                 class_export_put(exp);
1676                                 GOTO(err_out, rc);
1677                         }
1678                 }
1679
1680                 class_export_put(exp);
1681
1682                 rc = rev_import_init(exp);
1683                 if (rc != 0) {
1684                         class_unlink_export(exp);
1685                         GOTO(err_out, rc);
1686                 }
1687
1688                 /* Need to check last_rcvd even for duplicated exports. */
1689                 CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
1690                        cl_idx, last_transno);
1691
1692                 spin_lock(&tgt->lut_translock);
1693                 tgt->lut_last_transno = max(last_transno,
1694                                             tgt->lut_last_transno);
1695                 spin_unlock(&tgt->lut_translock);
1696         }
1697
1698         /* record highest valid client generation */
1699         atomic_set(&tgt->lut_client_generation, generation);
1700
1701 err_out:
1702         if (hash != NULL)
1703                 cfs_hash_putref(hash);
1704         OBD_FREE_PTR(lcd);
1705         RETURN(rc);
1706 }
1707
1708 struct server_compat_data {
1709         __u32 rocompat;
1710         __u32 incompat;
1711         __u32 rocinit;
1712         __u32 incinit;
1713 };
1714
1715 static struct server_compat_data tgt_scd[] = {
1716         [LDD_F_SV_TYPE_MDT] = {
1717                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
1718                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1719                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
1720                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
1721                             OBD_INCOMPAT_MULTI_RPCS,
1722                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
1723                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1724                            OBD_INCOMPAT_MULTI_OI,
1725         },
1726         [LDD_F_SV_TYPE_OST] = {
1727                 .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
1728                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
1729                             OBD_INCOMPAT_FID,
1730                 .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
1731                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
1732         }
1733 };
1734
1735 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
1736 {
1737         struct tgt_thread_info          *tti = tgt_th_info(env);
1738         struct lr_server_data           *lsd = &tgt->lut_lsd;
1739         unsigned long                    last_rcvd_size;
1740         __u32                            index;
1741         int                              rc, type;
1742
1743         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
1744         if (rc)
1745                 RETURN(rc);
1746
1747         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1748
1749         /* ensure padding in the struct is the correct size */
1750         BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
1751                      sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
1752
1753         rc = server_name2index(tgt_name(tgt), &index, NULL);
1754         if (rc < 0) {
1755                 CERROR("%s: Can not get index from name: rc = %d\n",
1756                        tgt_name(tgt), rc);
1757                 RETURN(rc);
1758         }
1759         /* server_name2index() returns type */
1760         type = rc;
1761         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1762                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1763                 RETURN(-EINVAL);
1764         }
1765
1766         /* last_rcvd on OST doesn't provide reconstruct support because there
1767          * may be up to 8 in-flight write requests per single slot in
1768          * last_rcvd client data
1769          */
1770         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1771
1772         if (last_rcvd_size == 0) {
1773                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1774
1775                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1776                        sizeof(lsd->lsd_uuid));
1777                 lsd->lsd_last_transno = 0;
1778                 lsd->lsd_mount_count = 0;
1779                 lsd->lsd_server_size = LR_SERVER_SIZE;
1780                 lsd->lsd_client_start = LR_CLIENT_START;
1781                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1782                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1783                 lsd->lsd_osd_index = index;
1784                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1785                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1786         } else {
1787                 rc = tgt_server_data_read(env, tgt);
1788                 if (rc) {
1789                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1790                                tgt_name(tgt), rc);
1791                         RETURN(rc);
1792                 }
1793                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1794                         if (tgt->lut_bottom->dd_rdonly) {
1795                                 /* Such difference may be caused by mounting
1796                                  * up snapshot with new fsname under rd_only
1797                                  * mode. But even if it was NOT, it will not
1798                                  * damage the system because of "rd_only". */
1799                                 memcpy(lsd->lsd_uuid,
1800                                        tgt->lut_obd->obd_uuid.uuid,
1801                                        sizeof(lsd->lsd_uuid));
1802                         } else {
1803                                 LCONSOLE_ERROR_MSG(0x157, "Trying to start "
1804                                                    "OBD %s using the wrong "
1805                                                    "disk %s. Were the /dev/ "
1806                                                    "assignments rearranged?\n",
1807                                                    tgt->lut_obd->obd_uuid.uuid,
1808                                                    lsd->lsd_uuid);
1809                                 RETURN(-EINVAL);
1810                         }
1811                 }
1812
1813                 if (lsd->lsd_osd_index != index) {
1814                         LCONSOLE_ERROR_MSG(0x157,
1815                                            "%s: index %d in last rcvd is different with the index %d in config log, It might be disk corruption!\n",
1816                                            tgt_name(tgt),
1817                                            lsd->lsd_osd_index, index);
1818                         RETURN(-EINVAL);
1819                 }
1820         }
1821
1822         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1823                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1824                        tgt_name(tgt),
1825                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1826                 RETURN(-EINVAL);
1827         }
1828
1829         if (type == LDD_F_SV_TYPE_MDT)
1830                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1831
1832         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1833                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1834                        tgt_name(tgt),
1835                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1836                 RETURN(-EINVAL);
1837         }
1838         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1839         if (type == LDD_F_SV_TYPE_MDT &&
1840             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1841                 if (last_rcvd_size > lsd->lsd_client_start) {
1842                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1843                                       "remove all clients for interop needs\n",
1844                                       tgt_name(tgt));
1845                         rc = tgt_truncate_last_rcvd(env, tgt,
1846                                                     lsd->lsd_client_start);
1847                         if (rc)
1848                                 RETURN(rc);
1849                         last_rcvd_size = lsd->lsd_client_start;
1850                 }
1851                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1852                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1853         }
1854
1855         spin_lock(&tgt->lut_translock);
1856         tgt->lut_last_transno = lsd->lsd_last_transno;
1857         spin_unlock(&tgt->lut_translock);
1858
1859         lsd->lsd_mount_count++;
1860
1861         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1862         CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
1863                tgt_name(tgt), tgt->lut_last_transno);
1864         CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
1865                tgt_name(tgt), lsd->lsd_mount_count);
1866         CDEBUG(D_INODE, "%s: server data size: %u\n",
1867                tgt_name(tgt), lsd->lsd_server_size);
1868         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1869                tgt_name(tgt), lsd->lsd_client_start);
1870         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1871                tgt_name(tgt), lsd->lsd_client_size);
1872         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1873                tgt_name(tgt), last_rcvd_size);
1874         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1875                tgt_name(tgt), lsd->lsd_subdir_count);
1876         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1877                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1878                (last_rcvd_size - lsd->lsd_client_start) /
1879                 lsd->lsd_client_size);
1880         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1881
1882         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1883             lsd->lsd_client_size == 0) {
1884                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1885                 RETURN(-EINVAL);
1886         }
1887
1888         if (!tgt->lut_obd->obd_replayable)
1889                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1890
1891         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1892         if (rc < 0)
1893                 GOTO(err_client, rc);
1894
1895         spin_lock(&tgt->lut_translock);
1896         /* obd_last_committed is used for compatibility
1897          * with other lustre recovery code */
1898         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
1899         spin_unlock(&tgt->lut_translock);
1900
1901         tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
1902         tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
1903
1904         /* save it, so mount count and last_transno is current */
1905         rc = tgt_server_data_update(env, tgt, 0);
1906         if (rc < 0)
1907                 GOTO(err_client, rc);
1908
1909         RETURN(0);
1910
1911 err_client:
1912         class_disconnect_exports(tgt->lut_obd);
1913         return rc;
1914 }
1915
1916 /* add credits for last_rcvd update */
1917 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
1918                      void *cookie)
1919 {
1920         struct lu_target        *tgt = cookie;
1921         struct tgt_session_info *tsi;
1922         struct tgt_thread_info  *tti = tgt_th_info(env);
1923         struct dt_object        *dto;
1924         int                      rc;
1925
1926         /* For readonly case, the caller should have got failure
1927          * when start the transaction. If the logic comes here,
1928          * there must be something wrong. */
1929         if (unlikely(tgt->lut_bottom->dd_rdonly)) {
1930                 dump_stack();
1931                 LBUG();
1932         }
1933
1934         /* if there is no session, then this transaction is not result of
1935          * request processing but some local operation */
1936         if (env->le_ses == NULL)
1937                 return 0;
1938
1939         LASSERT(tgt->lut_last_rcvd);
1940         tsi = tgt_ses_info(env);
1941         /* OFD may start transaction without export assigned */
1942         if (tsi->tsi_exp == NULL)
1943                 return 0;
1944
1945         if (tgt_is_multimodrpcs_client(tsi->tsi_exp)) {
1946                 /*
1947                  * Use maximum possible file offset for declaration to ensure
1948                  * ZFS will reserve enough credits for a write anywhere in this
1949                  * file, since we don't know where in the file the write will be
1950                  * because a replay slot has not been assigned.  This should be
1951                  * replaced by dmu_tx_hold_append() when available.
1952                  */
1953                 tti->tti_buf.lb_buf = NULL;
1954                 tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
1955                 dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
1956                 rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
1957                 if (rc)
1958                         return rc;
1959         } else {
1960                 dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
1961                 tti_buf_lcd(tti);
1962                 tti->tti_off = tsi->tsi_exp->exp_target_data.ted_lr_off;
1963                 rc = dt_declare_record_write(env, dto, &tti->tti_buf,
1964                                              tti->tti_off, th);
1965                 if (rc)
1966                         return rc;
1967         }
1968
1969         if (tsi->tsi_vbr_obj != NULL &&
1970             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
1971                 dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
1972                 rc = dt_declare_version_set(env, dto, th);
1973         }
1974
1975         return rc;
1976 }
1977
1978 /* Update last_rcvd records with latests transaction data */
1979 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
1980                     void *cookie)
1981 {
1982         struct lu_target        *tgt = cookie;
1983         struct tgt_session_info *tsi;
1984         struct tgt_thread_info  *tti = tgt_th_info(env);
1985         struct dt_object        *obj = NULL;
1986         int                      rc;
1987         bool                     echo_client;
1988
1989         if (env->le_ses == NULL)
1990                 return 0;
1991
1992         tsi = tgt_ses_info(env);
1993         /* OFD may start transaction without export assigned */
1994         if (tsi->tsi_exp == NULL)
1995                 return 0;
1996
1997         echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
1998
1999         if (tti->tti_has_trans && !echo_client) {
2000                 if (tti->tti_mult_trans == 0) {
2001                         CDEBUG(D_HA, "More than one transaction %llu\n",
2002                                tti->tti_transno);
2003                         RETURN(0);
2004                 }
2005                 /* we need another transno to be assigned */
2006                 tti->tti_transno = 0;
2007         } else if (th->th_result == 0) {
2008                 tti->tti_has_trans = 1;
2009         }
2010
2011         if (tsi->tsi_vbr_obj != NULL &&
2012             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2013                 obj = tsi->tsi_vbr_obj;
2014         }
2015
2016         if (unlikely(echo_client)) /* echo client special case */
2017                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
2018                                                tsi->tsi_exp);
2019         else
2020                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
2021                                           tgt_ses_req(tsi));
2022         return rc;
2023 }
2024
2025 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
2026 {
2027         struct tgt_thread_info  *tti = tgt_th_info(env);
2028         struct lsd_reply_data   *lrd = &tti->tti_lrd;
2029         unsigned long            reply_data_size;
2030         int                      rc;
2031         struct lsd_reply_header *lrh = NULL;
2032         struct tg_reply_data    *trd = NULL;
2033         int                      idx;
2034         loff_t                   off;
2035         struct cfs_hash         *hash = NULL;
2036         struct obd_export       *exp;
2037         struct tg_export_data   *ted;
2038         int                      reply_data_recovered = 0;
2039
2040         rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr);
2041         if (rc)
2042                 GOTO(out, rc);
2043         reply_data_size = (unsigned long)tti->tti_attr.la_size;
2044
2045         OBD_ALLOC_PTR(lrh);
2046         if (lrh == NULL)
2047                 GOTO(out, rc = -ENOMEM);
2048
2049         if (reply_data_size == 0) {
2050                 CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
2051                        tgt_name(tgt));
2052                 lrh->lrh_magic = LRH_MAGIC;
2053                 lrh->lrh_header_size = sizeof(struct lsd_reply_header);
2054                 lrh->lrh_reply_size = sizeof(struct lsd_reply_data);
2055                 rc = tgt_reply_header_write(env, tgt, lrh);
2056                 if (rc) {
2057                         CERROR("%s: error writing %s: rc = %d\n",
2058                                tgt_name(tgt), REPLY_DATA, rc);
2059                         GOTO(out, rc);
2060                 }
2061         } else {
2062                 rc = tgt_reply_header_read(env, tgt, lrh);
2063                 if (rc) {
2064                         CERROR("%s: error reading %s: rc = %d\n",
2065                                tgt_name(tgt), REPLY_DATA, rc);
2066                         GOTO(out, rc);
2067                 }
2068                 if (lrh->lrh_magic != LRH_MAGIC ||
2069                     lrh->lrh_header_size != sizeof(struct lsd_reply_header) ||
2070                     lrh->lrh_reply_size != sizeof(struct lsd_reply_data)) {
2071                         CERROR("%s: invalid header in %s\n",
2072                                tgt_name(tgt), REPLY_DATA);
2073                         GOTO(out, rc = -EINVAL);
2074                 }
2075
2076                 hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
2077                 if (hash == NULL)
2078                         GOTO(out, rc = -ENODEV);
2079
2080                 OBD_ALLOC_PTR(trd);
2081                 if (trd == NULL)
2082                         GOTO(out, rc = -ENOMEM);
2083
2084                 /* Load reply_data from disk */
2085                 for (idx = 0, off = sizeof(struct lsd_reply_header);
2086                      off < reply_data_size;
2087                      idx++, off += sizeof(struct lsd_reply_data)) {
2088                         rc = tgt_reply_data_read(env, tgt, lrd, off);
2089                         if (rc) {
2090                                 CERROR("%s: error reading %s: rc = %d\n",
2091                                        tgt_name(tgt), REPLY_DATA, rc);
2092                                 GOTO(out, rc);
2093                         }
2094
2095                         exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
2096                         if (exp == NULL) {
2097                                 /* old reply data from a disconnected client */
2098                                 continue;
2099                         }
2100                         ted = &exp->exp_target_data;
2101                         mutex_lock(&ted->ted_lcd_lock);
2102
2103                         /* create in-memory reply_data and link it to
2104                          * target export's reply list */
2105                         rc = tgt_set_reply_slot(tgt, idx);
2106                         if (rc != 0) {
2107                                 mutex_unlock(&ted->ted_lcd_lock);
2108                                 GOTO(out, rc);
2109                         }
2110                         trd->trd_reply = *lrd;
2111                         trd->trd_pre_versions[0] = 0;
2112                         trd->trd_pre_versions[1] = 0;
2113                         trd->trd_pre_versions[2] = 0;
2114                         trd->trd_pre_versions[3] = 0;
2115                         trd->trd_index = idx;
2116                         trd->trd_tag = 0;
2117                         fid_zero(&trd->trd_object);
2118                         list_add(&trd->trd_list, &ted->ted_reply_list);
2119                         ted->ted_reply_cnt++;
2120                         if (ted->ted_reply_cnt > ted->ted_reply_max)
2121                                 ted->ted_reply_max = ted->ted_reply_cnt;
2122
2123                         CDEBUG(D_HA, "%s: restore reply %p: xid %llu, "
2124                                "transno %llu, client gen %u, slot idx %d\n",
2125                                tgt_name(tgt), trd, lrd->lrd_xid,
2126                                lrd->lrd_transno, lrd->lrd_client_gen,
2127                                trd->trd_index);
2128
2129                         /* update export last committed transation */
2130                         exp->exp_last_committed = max(exp->exp_last_committed,
2131                                                       lrd->lrd_transno);
2132                         /* Update lcd_last_transno as well for check in
2133                          * tgt_release_reply_data() or the latest client
2134                          * transno can be lost.
2135                          */
2136                         ted->ted_lcd->lcd_last_transno =
2137                                 max(ted->ted_lcd->lcd_last_transno,
2138                                     exp->exp_last_committed);
2139
2140                         mutex_unlock(&ted->ted_lcd_lock);
2141                         class_export_put(exp);
2142
2143                         /* update target last committed transaction */
2144                         spin_lock(&tgt->lut_translock);
2145                         tgt->lut_last_transno = max(tgt->lut_last_transno,
2146                                                     lrd->lrd_transno);
2147                         spin_unlock(&tgt->lut_translock);
2148
2149                         reply_data_recovered++;
2150
2151                         OBD_ALLOC_PTR(trd);
2152                         if (trd == NULL)
2153                                 GOTO(out, rc = -ENOMEM);
2154                 }
2155                 CDEBUG(D_INFO, "%s: %d reply data have been recovered\n",
2156                        tgt_name(tgt), reply_data_recovered);
2157         }
2158
2159         spin_lock(&tgt->lut_translock);
2160         /* obd_last_committed is used for compatibility
2161          * with other lustre recovery code */
2162         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
2163         spin_unlock(&tgt->lut_translock);
2164
2165         rc = 0;
2166
2167 out:
2168         if (hash != NULL)
2169                 cfs_hash_putref(hash);
2170         if (trd != NULL)
2171                 OBD_FREE_PTR(trd);
2172         if (lrh != NULL)
2173                 OBD_FREE_PTR(lrh);
2174         return rc;
2175 }
2176
2177 static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
2178                                 struct tg_reply_data *trd)
2179 {
2180         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2181         struct lu_target *lut = class_exp2tgt(req->rq_export);
2182         __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
2183         int rc = 0;
2184         struct tg_reply_data *reply;
2185         bool check_increasing;
2186
2187         if (tag == 0)
2188                 return 0;
2189
2190         check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
2191                            !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2192         if (!lookup && !check_increasing)
2193                 return 0;
2194
2195         list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
2196                 if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
2197                         rc = 1;
2198                         if (trd != NULL)
2199                                 *trd = *reply;
2200                         break;
2201                 } else if (check_increasing && reply->trd_tag == tag &&
2202                            reply->trd_reply.lrd_xid > req->rq_xid) {
2203                         rc = -EPROTO;
2204                         CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
2205                                tgt_name(lut), tag, req->rq_xid, trd,
2206                                reply->trd_reply.lrd_xid,
2207                                reply->trd_reply.lrd_transno,
2208                                reply->trd_reply.lrd_client_gen,
2209                                reply->trd_index, rc);
2210                         break;
2211                 }
2212         }
2213
2214         return rc;
2215 }
2216
2217 /* Look for a reply data matching specified request @req
2218  * A copy is returned in @trd if the pointer is not NULL
2219  */
2220 int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
2221 {
2222         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2223         int found = 0;
2224         bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2225
2226         mutex_lock(&ted->ted_lcd_lock);
2227         if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
2228                 /* A check for the last_xid is needed here in case there is
2229                  * no reply data is left in the list. It may happen if another
2230                  * RPC on another slot increased the last_xid between our
2231                  * process_req_last_xid & tgt_lookup_reply calls */
2232                 found = -EPROTO;
2233         } else {
2234                 found = tgt_check_lookup_req(req, 1, trd);
2235         }
2236         mutex_unlock(&ted->ted_lcd_lock);
2237
2238         CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
2239                tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
2240                req->rq_export->exp_last_xid);
2241
2242         return found;
2243 }
2244 EXPORT_SYMBOL(tgt_lookup_reply);
2245
2246 int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
2247 {
2248         struct tg_export_data   *ted = &exp->exp_target_data;
2249         struct lu_target        *lut = class_exp2tgt(exp);
2250         struct tg_reply_data    *trd, *tmp;
2251
2252
2253         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
2254                 if (trd->trd_reply.lrd_xid > rcvd_xid)
2255                         continue;
2256                 ted->ted_release_xid++;
2257                 tgt_release_reply_data(lut, ted, trd);
2258         }
2259
2260         return 0;
2261 }
2262
2263 int tgt_handle_tag(struct ptlrpc_request *req)
2264 {
2265         return tgt_check_lookup_req(req, 0, NULL);
2266 }
2267