Whamcloud - gitweb
LU-11303 quota: enforce block quota for chgrp
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Lustre Unified Target
32  * These are common function to work with last_received file
33  *
34  * Author: Mikhail Pershin <mike.pershin@intel.com>
35  */
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_fid.h>
39
40 #include "tgt_internal.h"
41
42 /** version recovery epoch */
43 #define LR_EPOCH_BITS   32
44
45 /* Allocate a bitmap for a chunk of reply data slots */
46 static int tgt_bitmap_chunk_alloc(struct lu_target *lut, int chunk)
47 {
48         unsigned long *bm;
49
50         OBD_ALLOC_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
51                         sizeof(long));
52         if (bm == NULL)
53                 return -ENOMEM;
54
55         spin_lock(&lut->lut_client_bitmap_lock);
56
57         if (lut->lut_reply_bitmap[chunk] != NULL) {
58                 /* someone else already allocated the bitmap for this chunk */
59                 spin_unlock(&lut->lut_client_bitmap_lock);
60                 OBD_FREE_LARGE(bm, BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
61                          sizeof(long));
62                 return 0;
63         }
64
65         lut->lut_reply_bitmap[chunk] = bm;
66
67         spin_unlock(&lut->lut_client_bitmap_lock);
68
69         return 0;
70 }
71
72 /* Look for an available reply data slot in the bitmap
73  * of the target @lut
74  * Allocate bitmap chunk when first used
75  * XXX algo could be improved if this routine limits performance
76  */
77 static int tgt_find_free_reply_slot(struct lu_target *lut)
78 {
79         unsigned long *bmp;
80         int chunk = 0;
81         int rc;
82         int b;
83
84         for (chunk = 0; chunk < LUT_REPLY_SLOTS_MAX_CHUNKS; chunk++) {
85                 /* allocate the bitmap chunk if necessary */
86                 if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
87                         rc = tgt_bitmap_chunk_alloc(lut, chunk);
88                         if (rc != 0)
89                                 return rc;
90                 }
91                 bmp = lut->lut_reply_bitmap[chunk];
92
93                 /* look for an available slot in this chunk */
94                 do {
95                         b = find_first_zero_bit(bmp, LUT_REPLY_SLOTS_PER_CHUNK);
96                         if (b >= LUT_REPLY_SLOTS_PER_CHUNK)
97                                 break;
98
99                         /* found one */
100                         if (test_and_set_bit(b, bmp) == 0)
101                                 return chunk * LUT_REPLY_SLOTS_PER_CHUNK + b;
102                 } while (true);
103         }
104
105         return -ENOSPC;
106 }
107
108 /* Mark the reply data slot @idx 'used' in the corresponding bitmap chunk
109  * of the target @lut
110  * Allocate the bitmap chunk if necessary
111  */
112 static int tgt_set_reply_slot(struct lu_target *lut, int idx)
113 {
114         int chunk;
115         int b;
116         int rc;
117
118         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
119         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
120
121         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
122         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
123
124         /* allocate the bitmap chunk if necessary */
125         if (unlikely(lut->lut_reply_bitmap[chunk] == NULL)) {
126                 rc = tgt_bitmap_chunk_alloc(lut, chunk);
127                 if (rc != 0)
128                         return rc;
129         }
130
131         /* mark the slot 'used' in this chunk */
132         if (test_and_set_bit(b, lut->lut_reply_bitmap[chunk]) != 0) {
133                 CERROR("%s: slot %d already set in bitmap\n",
134                        tgt_name(lut), idx);
135                 return -EALREADY;
136         }
137
138         return 0;
139 }
140
141
142 /* Mark the reply data slot @idx 'unused' in the corresponding bitmap chunk
143  * of the target @lut
144  */
145 static int tgt_clear_reply_slot(struct lu_target *lut, int idx)
146 {
147         int chunk;
148         int b;
149
150         if (lut->lut_obd->obd_stopping)
151                 /*
152                  * in case of failover keep the bit set in order to
153                  * avoid overwriting slots in reply_data which might
154                  * be required by resent rpcs
155                  */
156                 return 0;
157         chunk = idx / LUT_REPLY_SLOTS_PER_CHUNK;
158         b = idx % LUT_REPLY_SLOTS_PER_CHUNK;
159
160         LASSERT(chunk < LUT_REPLY_SLOTS_MAX_CHUNKS);
161         LASSERT(b < LUT_REPLY_SLOTS_PER_CHUNK);
162
163         if (lut->lut_reply_bitmap[chunk] == NULL) {
164                 CERROR("%s: slot %d not allocated\n",
165                        tgt_name(lut), idx);
166                 return -ENOENT;
167         }
168
169         if (test_and_clear_bit(b, lut->lut_reply_bitmap[chunk]) == 0) {
170                 CERROR("%s: slot %d already clear in bitmap\n",
171                        tgt_name(lut), idx);
172                 return -EALREADY;
173         }
174
175         return 0;
176 }
177
178
179 /* Read header of reply_data file of target @tgt into structure @lrh */
180 static int tgt_reply_header_read(const struct lu_env *env,
181                                  struct lu_target *tgt,
182                                  struct lsd_reply_header *lrh)
183 {
184         int                      rc;
185         struct lsd_reply_header  buf;
186         struct tgt_thread_info  *tti = tgt_th_info(env);
187
188         tti->tti_off = 0;
189         tti->tti_buf.lb_buf = &buf;
190         tti->tti_buf.lb_len = sizeof(buf);
191
192         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
193                             &tti->tti_off);
194         if (rc != 0)
195                 return rc;
196
197         lrh->lrh_magic = le32_to_cpu(buf.lrh_magic);
198         lrh->lrh_header_size = le32_to_cpu(buf.lrh_header_size);
199         lrh->lrh_reply_size = le32_to_cpu(buf.lrh_reply_size);
200
201         CDEBUG(D_HA, "%s: read %s header. magic=0x%08x "
202                "header_size=%d reply_size=%d\n",
203                 tgt->lut_obd->obd_name, REPLY_DATA,
204                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
205
206         return 0;
207 }
208
209 /* Write header into replay_data file of target @tgt from structure @lrh */
210 static int tgt_reply_header_write(const struct lu_env *env,
211                                   struct lu_target *tgt,
212                                   struct lsd_reply_header *lrh)
213 {
214         int                      rc;
215         struct lsd_reply_header  buf;
216         struct tgt_thread_info  *tti = tgt_th_info(env);
217         struct thandle          *th;
218         struct dt_object        *dto;
219
220         CDEBUG(D_HA, "%s: write %s header. magic=0x%08x "
221                "header_size=%d reply_size=%d\n",
222                 tgt->lut_obd->obd_name, REPLY_DATA,
223                 lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
224
225         if (tgt->lut_bottom->dd_rdonly)
226                 RETURN(0);
227
228         buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
229         buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
230         buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
231
232         th = dt_trans_create(env, tgt->lut_bottom);
233         if (IS_ERR(th))
234                 return PTR_ERR(th);
235         th->th_sync = 1;
236
237         tti->tti_off = 0;
238         tti->tti_buf.lb_buf = &buf;
239         tti->tti_buf.lb_len = sizeof(buf);
240
241         rc = dt_declare_record_write(env, tgt->lut_reply_data,
242                                      &tti->tti_buf, tti->tti_off, th);
243         if (rc)
244                 GOTO(out, rc);
245
246         rc = dt_trans_start(env, tgt->lut_bottom, th);
247         if (rc)
248                 GOTO(out, rc);
249
250         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
251         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
252 out:
253         dt_trans_stop(env, tgt->lut_bottom, th);
254         return rc;
255 }
256
257 /* Write the reply data @lrd into reply_data file of target @tgt
258  * at offset @off
259  */
260 static int tgt_reply_data_write(const struct lu_env *env, struct lu_target *tgt,
261                                 struct lsd_reply_data *lrd, loff_t off,
262                                 struct thandle *th)
263 {
264         struct tgt_thread_info  *tti = tgt_th_info(env);
265         struct dt_object        *dto;
266         struct lsd_reply_data   *buf = &tti->tti_lrd;
267
268         lrd->lrd_result = ptlrpc_status_hton(lrd->lrd_result);
269
270         buf->lrd_transno         = cpu_to_le64(lrd->lrd_transno);
271         buf->lrd_xid             = cpu_to_le64(lrd->lrd_xid);
272         buf->lrd_data            = cpu_to_le64(lrd->lrd_data);
273         buf->lrd_result          = cpu_to_le32(lrd->lrd_result);
274         buf->lrd_client_gen      = cpu_to_le32(lrd->lrd_client_gen);
275
276         lrd->lrd_result = ptlrpc_status_ntoh(lrd->lrd_result);
277
278         tti->tti_off = off;
279         tti->tti_buf.lb_buf = buf;
280         tti->tti_buf.lb_len = sizeof(*buf);
281
282         dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
283         return dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
284 }
285
286 /* Read the reply data from reply_data file of target @tgt at offset @off
287  * into structure @lrd
288  */
289 static int tgt_reply_data_read(const struct lu_env *env, struct lu_target *tgt,
290                                struct lsd_reply_data *lrd, loff_t off)
291 {
292         int                      rc;
293         struct tgt_thread_info  *tti = tgt_th_info(env);
294         struct lsd_reply_data   *buf = &tti->tti_lrd;
295
296         tti->tti_off = off;
297         tti->tti_buf.lb_buf = buf;
298         tti->tti_buf.lb_len = sizeof(*buf);
299
300         rc = dt_record_read(env, tgt->lut_reply_data, &tti->tti_buf,
301                             &tti->tti_off);
302         if (rc != 0)
303                 return rc;
304
305         lrd->lrd_transno         = le64_to_cpu(buf->lrd_transno);
306         lrd->lrd_xid             = le64_to_cpu(buf->lrd_xid);
307         lrd->lrd_data            = le64_to_cpu(buf->lrd_data);
308         lrd->lrd_result          = le32_to_cpu(buf->lrd_result);
309         lrd->lrd_client_gen      = le32_to_cpu(buf->lrd_client_gen);
310
311         return 0;
312 }
313
314
315 /* Free the in-memory reply data structure @trd and release
316  * the corresponding slot in the reply_data file of target @lut
317  * Called with ted_lcd_lock held
318  */
319 static void tgt_free_reply_data(struct lu_target *lut,
320                                 struct tg_export_data *ted,
321                                 struct tg_reply_data *trd)
322 {
323         CDEBUG(D_TRACE, "%s: free reply data %p: xid %llu, transno %llu, "
324                "client gen %u, slot idx %d\n",
325                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
326                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
327                trd->trd_index);
328
329         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
330
331         list_del(&trd->trd_list);
332         ted->ted_reply_cnt--;
333         if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
334                 tgt_clear_reply_slot(lut, trd->trd_index);
335         OBD_FREE_PTR(trd);
336 }
337
338 /* Release the reply data @trd from target @lut
339  * The reply data with the highest transno for this export
340  * is retained to ensure correctness of target recovery
341  * Called with ted_lcd_lock held
342  */
343 static void tgt_release_reply_data(struct lu_target *lut,
344                                    struct tg_export_data *ted,
345                                    struct tg_reply_data *trd)
346 {
347         CDEBUG(D_TRACE, "%s: release reply data %p: xid %llu, transno %llu, "
348                "client gen %u, slot idx %d\n",
349                lut == NULL ? "" : tgt_name(lut), trd, trd->trd_reply.lrd_xid,
350                trd->trd_reply.lrd_transno, trd->trd_reply.lrd_client_gen,
351                trd->trd_index);
352
353         LASSERT(mutex_is_locked(&ted->ted_lcd_lock));
354
355         /* Do not free the reply data corresponding to the
356          * highest transno of this export.
357          * This ensures on-disk reply data is kept and
358          * last committed transno can be restored from disk in case
359          * of target recovery
360          */
361         if (trd->trd_reply.lrd_transno == ted->ted_lcd->lcd_last_transno) {
362                 /* free previous retained reply */
363                 if (ted->ted_reply_last != NULL)
364                         tgt_free_reply_data(lut, ted, ted->ted_reply_last);
365                 /* retain the reply */
366                 list_del_init(&trd->trd_list);
367                 ted->ted_reply_last = trd;
368         } else {
369                 tgt_free_reply_data(lut, ted, trd);
370         }
371 }
372
373 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
374 {
375         tti->tti_buf.lb_buf = &tti->tti_lsd;
376         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
377         return &tti->tti_buf;
378 }
379
380 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
381 {
382         tti->tti_buf.lb_buf = &tti->tti_lcd;
383         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
384         return &tti->tti_buf;
385 }
386
387 /**
388  * Allocate in-memory data for client slot related to export.
389  */
390 int tgt_client_alloc(struct obd_export *exp)
391 {
392         ENTRY;
393         LASSERT(exp != exp->exp_obd->obd_self_export);
394
395         spin_lock_init(&exp->exp_target_data.ted_nodemap_lock);
396         INIT_LIST_HEAD(&exp->exp_target_data.ted_nodemap_member);
397         spin_lock_init(&exp->exp_target_data.ted_fmd_lock);
398         INIT_LIST_HEAD(&exp->exp_target_data.ted_fmd_list);
399
400         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
401         if (exp->exp_target_data.ted_lcd == NULL)
402                 RETURN(-ENOMEM);
403         /* Mark that slot is not yet valid, 0 doesn't work here */
404         exp->exp_target_data.ted_lr_idx = -1;
405         INIT_LIST_HEAD(&exp->exp_target_data.ted_reply_list);
406         mutex_init(&exp->exp_target_data.ted_lcd_lock);
407         RETURN(0);
408 }
409 EXPORT_SYMBOL(tgt_client_alloc);
410
411 /**
412  * Free in-memory data for client slot related to export.
413  */
414 void tgt_client_free(struct obd_export *exp)
415 {
416         struct tg_export_data   *ted = &exp->exp_target_data;
417         struct lu_target        *lut = class_exp2tgt(exp);
418         struct tg_reply_data    *trd, *tmp;
419
420         LASSERT(exp != exp->exp_obd->obd_self_export);
421
422         tgt_fmd_cleanup(exp);
423
424         /* free reply data */
425         mutex_lock(&ted->ted_lcd_lock);
426         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
427                 tgt_release_reply_data(lut, ted, trd);
428         }
429         if (ted->ted_reply_last != NULL) {
430                 tgt_free_reply_data(lut, ted, ted->ted_reply_last);
431                 ted->ted_reply_last = NULL;
432         }
433         mutex_unlock(&ted->ted_lcd_lock);
434
435         if (!hlist_unhashed(&exp->exp_gen_hash))
436                 cfs_hash_del(exp->exp_obd->obd_gen_hash,
437                              &ted->ted_lcd->lcd_generation,
438                              &exp->exp_gen_hash);
439
440         OBD_FREE_PTR(ted->ted_lcd);
441         ted->ted_lcd = NULL;
442
443         /* Target may have been freed (see LU-7430)
444          * Slot may be not yet assigned */
445         if (exp->exp_obd->u.obt.obt_magic != OBT_MAGIC ||
446             ted->ted_lr_idx < 0)
447                 return;
448
449         /* Clear bit when lcd is freed */
450         LASSERT(lut && lut->lut_client_bitmap);
451         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
452                 CERROR("%s: client %u bit already clear in bitmap\n",
453                        exp->exp_obd->obd_name, ted->ted_lr_idx);
454                 LBUG();
455         }
456
457         if (tgt_is_multimodrpcs_client(exp) && !exp->exp_obd->obd_stopping)
458                 atomic_dec(&lut->lut_num_clients);
459 }
460 EXPORT_SYMBOL(tgt_client_free);
461
462 static inline void tgt_check_lcd(const char *obd_name, int index,
463                                  struct lsd_client_data *lcd)
464 {
465         size_t uuid_size = sizeof(lcd->lcd_uuid);
466
467         if (strnlen((char*)lcd->lcd_uuid, uuid_size) == uuid_size) {
468                 lcd->lcd_uuid[uuid_size - 1] = '\0';
469
470                 LCONSOLE_ERROR("the client UUID (%s) on %s for exports stored in last_rcvd(index = %d) is bad!\n",
471                                lcd->lcd_uuid, obd_name, index);
472         }
473 }
474
475 static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
476                                 struct lsd_client_data *lcd,
477                                 loff_t *off, int index)
478 {
479         struct tgt_thread_info  *tti = tgt_th_info(env);
480         int                      rc;
481
482         tti_buf_lcd(tti);
483         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
484         if (rc == 0) {
485                 tgt_check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
486                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
487                 lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
488                 lcd->lcd_last_close_result =
489                         ptlrpc_status_ntoh(lcd->lcd_last_close_result);
490         }
491
492         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = %llu"
493                ", last_xid = %llu, last_result = %u, last_data = %u, "
494                "last_close_transno = %llu, last_close_xid = %llu, "
495                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
496                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
497                lcd->lcd_last_result, lcd->lcd_last_data,
498                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
499                lcd->lcd_last_close_result, rc);
500         return rc;
501 }
502
503 static int tgt_client_data_write(const struct lu_env *env,
504                                  struct lu_target *tgt,
505                                  struct lsd_client_data *lcd,
506                                  loff_t *off, struct thandle *th)
507 {
508         struct tgt_thread_info *tti = tgt_th_info(env);
509         struct dt_object        *dto;
510
511         lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
512         lcd->lcd_last_close_result =
513                 ptlrpc_status_hton(lcd->lcd_last_close_result);
514         lcd_cpu_to_le(lcd, &tti->tti_lcd);
515         tti_buf_lcd(tti);
516
517         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
518         return dt_record_write(env, dto, &tti->tti_buf, off, th);
519 }
520
521 struct tgt_new_client_callback {
522         struct dt_txn_commit_cb  lncc_cb;
523         struct obd_export       *lncc_exp;
524 };
525
526 static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
527                               struct dt_txn_commit_cb *cb, int err)
528 {
529         struct tgt_new_client_callback *ccb;
530
531         ccb = container_of(cb, struct tgt_new_client_callback, lncc_cb);
532
533         LASSERT(ccb->lncc_exp->exp_obd);
534
535         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
536                ccb->lncc_exp->exp_obd->obd_name,
537                ccb->lncc_exp->exp_client_uuid.uuid);
538
539         spin_lock(&ccb->lncc_exp->exp_lock);
540
541         ccb->lncc_exp->exp_need_sync = 0;
542
543         spin_unlock(&ccb->lncc_exp->exp_lock);
544         class_export_cb_put(ccb->lncc_exp);
545
546         OBD_FREE_PTR(ccb);
547 }
548
549 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
550 {
551         struct tgt_new_client_callback  *ccb;
552         struct dt_txn_commit_cb         *dcb;
553         int                              rc;
554
555         OBD_ALLOC_PTR(ccb);
556         if (ccb == NULL)
557                 return -ENOMEM;
558
559         ccb->lncc_exp = class_export_cb_get(exp);
560
561         dcb = &ccb->lncc_cb;
562         dcb->dcb_func = tgt_cb_new_client;
563         INIT_LIST_HEAD(&dcb->dcb_linkage);
564         strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
565
566         rc = dt_trans_cb_add(th, dcb);
567         if (rc) {
568                 class_export_cb_put(exp);
569                 OBD_FREE_PTR(ccb);
570         }
571         return rc;
572 }
573
574 /**
575  * Update client data in last_rcvd
576  */
577 static int tgt_client_data_update(const struct lu_env *env,
578                                   struct obd_export *exp)
579 {
580         struct tg_export_data   *ted = &exp->exp_target_data;
581         struct lu_target        *tgt = class_exp2tgt(exp);
582         struct tgt_thread_info  *tti = tgt_th_info(env);
583         struct thandle          *th;
584         int                      rc = 0;
585
586         ENTRY;
587
588         if (unlikely(tgt == NULL)) {
589                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
590                           class_exp2obd(exp)->obd_name);
591                 RETURN(-EINVAL);
592         }
593
594         if (tgt->lut_bottom->dd_rdonly)
595                 RETURN(0);
596
597         th = dt_trans_create(env, tgt->lut_bottom);
598         if (IS_ERR(th))
599                 RETURN(PTR_ERR(th));
600
601         tti_buf_lcd(tti);
602         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
603                                      &tti->tti_buf,
604                                      ted->ted_lr_off, th);
605         if (rc)
606                 GOTO(out, rc);
607
608         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
609         if (rc)
610                 GOTO(out, rc);
611
612         mutex_lock(&ted->ted_lcd_lock);
613
614         /*
615          * Until this operations will be committed the sync is needed
616          * for this export. This should be done _after_ starting the
617          * transaction so that many connecting clients will not bring
618          * server down with lots of sync writes.
619          */
620         rc = tgt_new_client_cb_add(th, exp);
621         if (rc) {
622                 /* can't add callback, do sync now */
623                 th->th_sync = 1;
624         } else {
625                 spin_lock(&exp->exp_lock);
626                 exp->exp_need_sync = 1;
627                 spin_unlock(&exp->exp_lock);
628         }
629
630         tti->tti_off = ted->ted_lr_off;
631         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
632
633         mutex_unlock(&ted->ted_lcd_lock);
634
635         EXIT;
636 out:
637         dt_trans_stop(env, tgt->lut_bottom, th);
638         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
639                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
640                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
641
642         return rc;
643 }
644
645 static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
646 {
647         struct tgt_thread_info  *tti = tgt_th_info(env);
648         int                      rc;
649
650         tti->tti_off = 0;
651         tti_buf_lsd(tti);
652         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
653                             &tti->tti_off);
654         if (rc == 0)
655                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
656
657         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
658                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
659                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
660         return rc;
661 }
662
663 static int tgt_server_data_write(const struct lu_env *env,
664                                  struct lu_target *tgt, struct thandle *th)
665 {
666         struct tgt_thread_info  *tti = tgt_th_info(env);
667         struct dt_object        *dto;
668         int                      rc;
669
670         ENTRY;
671
672         tti->tti_off = 0;
673         tti_buf_lsd(tti);
674         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
675
676         dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
677         rc = dt_record_write(env, dto, &tti->tti_buf, &tti->tti_off, th);
678
679         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
680                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
681                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
682
683         RETURN(rc);
684 }
685
686 /**
687  * Update server data in last_rcvd
688  */
689 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
690                            int sync)
691 {
692         struct tgt_thread_info  *tti = tgt_th_info(env);
693         struct thandle          *th;
694         int                      rc = 0;
695
696         ENTRY;
697
698         CDEBUG(D_SUPER,
699                "%s: mount_count is %llu, last_transno is %llu\n",
700                tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
701                tgt->lut_last_transno);
702
703         /* Always save latest transno to keep it fresh */
704         spin_lock(&tgt->lut_translock);
705         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
706         spin_unlock(&tgt->lut_translock);
707
708         if (tgt->lut_bottom->dd_rdonly)
709                 RETURN(0);
710
711         th = dt_trans_create(env, tgt->lut_bottom);
712         if (IS_ERR(th))
713                 RETURN(PTR_ERR(th));
714
715         th->th_sync = sync;
716
717         tti_buf_lsd(tti);
718         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
719                                      &tti->tti_buf, tti->tti_off, th);
720         if (rc)
721                 GOTO(out, rc);
722
723         rc = dt_trans_start(env, tgt->lut_bottom, th);
724         if (rc)
725                 GOTO(out, rc);
726
727         rc = tgt_server_data_write(env, tgt, th);
728 out:
729         dt_trans_stop(env, tgt->lut_bottom, th);
730
731         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
732                "last_transno = %llu: rc = %d\n", tgt->lut_obd->obd_name,
733                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
734         RETURN(rc);
735 }
736 EXPORT_SYMBOL(tgt_server_data_update);
737
738 static int tgt_truncate_last_rcvd(const struct lu_env *env,
739                                   struct lu_target *tgt, loff_t size)
740 {
741         struct dt_object *dt = tgt->lut_last_rcvd;
742         struct thandle   *th;
743         struct lu_attr    attr;
744         int               rc;
745
746         ENTRY;
747
748         if (tgt->lut_bottom->dd_rdonly)
749                 RETURN(0);
750
751         attr.la_size = size;
752         attr.la_valid = LA_SIZE;
753
754         th = dt_trans_create(env, tgt->lut_bottom);
755         if (IS_ERR(th))
756                 RETURN(PTR_ERR(th));
757         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
758         if (rc)
759                 GOTO(cleanup, rc);
760         rc = dt_declare_attr_set(env, dt, &attr, th);
761         if (rc)
762                 GOTO(cleanup, rc);
763         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
764         if (rc)
765                 GOTO(cleanup, rc);
766
767         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th);
768         if (rc == 0)
769                 rc = dt_attr_set(env, dt, &attr, th);
770
771 cleanup:
772         dt_trans_stop(env, tgt->lut_bottom, th);
773
774         RETURN(rc);
775 }
776
777 static void tgt_client_epoch_update(const struct lu_env *env,
778                                     struct obd_export *exp)
779 {
780         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
781         struct lu_target        *tgt = class_exp2tgt(exp);
782
783         LASSERT(tgt && tgt->lut_bottom);
784         /** VBR: set client last_epoch to current epoch */
785         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
786                 return;
787         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
788         tgt_client_data_update(env, exp);
789 }
790
791 /**
792  * Update boot epoch when recovery ends
793  */
794 void tgt_boot_epoch_update(struct lu_target *tgt)
795 {
796         struct lu_env            env;
797         struct ptlrpc_request   *req;
798         __u32                    start_epoch;
799         LIST_HEAD(client_list);
800         int                      rc;
801
802         if (tgt->lut_obd->obd_stopping)
803                 return;
804
805         rc = lu_env_init(&env, LCT_DT_THREAD);
806         if (rc) {
807                 CERROR("%s: can't initialize environment: rc = %d\n",
808                         tgt->lut_obd->obd_name, rc);
809                 return;
810         }
811
812         spin_lock(&tgt->lut_translock);
813         start_epoch = (tgt->lut_last_transno >> LR_EPOCH_BITS) + 1;
814         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
815         tgt->lut_lsd.lsd_start_epoch = start_epoch;
816         spin_unlock(&tgt->lut_translock);
817
818         /**
819          * The recovery is not yet finished and final queue can still be updated
820          * with resend requests. Move final list to separate one for processing
821          */
822         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
823         list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
824         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
825
826         /**
827          * go through list of exports participated in recovery and
828          * set new epoch for them
829          */
830         list_for_each_entry(req, &client_list, rq_list) {
831                 LASSERT(!req->rq_export->exp_delayed);
832                 if (!req->rq_export->exp_vbr_failed)
833                         tgt_client_epoch_update(&env, req->rq_export);
834         }
835         /** return list back at once */
836         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
837         list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
838         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
839
840         /** Clear MULTI RPCS incompatibility flag if
841          * - target is MDT and
842          * - there is no client to recover or the recovery was aborted
843          */
844         if (!strncmp(tgt->lut_obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) &&
845             (atomic_read(&tgt->lut_obd->obd_max_recoverable_clients) == 0 ||
846             tgt->lut_obd->obd_abort_recovery))
847                 tgt->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
848
849         /** update server epoch */
850         tgt_server_data_update(&env, tgt, 1);
851         lu_env_fini(&env);
852 }
853
854 /**
855  * commit callback, need to update last_committed value
856  */
857 struct tgt_last_committed_callback {
858         struct dt_txn_commit_cb  llcc_cb;
859         struct lu_target        *llcc_tgt;
860         struct obd_export       *llcc_exp;
861         __u64                    llcc_transno;
862 };
863
864 static void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
865                                   struct dt_txn_commit_cb *cb, int err)
866 {
867         struct tgt_last_committed_callback *ccb;
868
869         ccb = container_of(cb, struct tgt_last_committed_callback, llcc_cb);
870
871         LASSERT(ccb->llcc_exp);
872         LASSERT(ccb->llcc_tgt != NULL);
873         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
874
875         if (th->th_reserved_quota.qrr_count > 0) {
876                 struct lu_env            temp_env;
877                 int rc;
878
879                 CDEBUG(D_QUOTA, "free quota %llu %llu\n",
880                        th->th_reserved_quota.qrr_id.qid_gid,
881                        th->th_reserved_quota.qrr_count);
882
883                 rc = lu_env_init(&temp_env, LCT_DT_THREAD);
884                 if (rc) {
885                         CERROR("%s: can't initialize environment: rc = %d\n",
886                                ccb->llcc_tgt->lut_obd->obd_name, rc);
887                         goto out;
888                 }
889
890                 dt_reserve_or_free_quota(&temp_env, th->th_dev,
891                                          th->th_reserved_quota.qrr_type,
892                                          th->th_reserved_quota.qrr_id.qid_uid,
893                                          th->th_reserved_quota.qrr_id.qid_gid,
894                                          -th->th_reserved_quota.qrr_count,
895                                          false);
896                 lu_env_fini(&temp_env);
897         }
898
899         /* error hit, don't update last committed to provide chance to
900          * replay data after fail */
901         if (err != 0)
902                 goto out;
903
904         /* Fast path w/o spinlock, if exp_last_committed was updated
905          * with higher transno, no need to take spinlock and check,
906          * also no need to update obd_last_committed. */
907         if (ccb->llcc_transno <= ccb->llcc_exp->exp_last_committed)
908                 goto out;
909         spin_lock(&ccb->llcc_tgt->lut_translock);
910         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
911                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
912
913         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
914                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
915                 spin_unlock(&ccb->llcc_tgt->lut_translock);
916
917                 ptlrpc_commit_replies(ccb->llcc_exp);
918                 tgt_cancel_slc_locks(ccb->llcc_tgt, ccb->llcc_transno);
919         } else {
920                 spin_unlock(&ccb->llcc_tgt->lut_translock);
921         }
922
923         CDEBUG(D_HA, "%s: transno %lld is committed\n",
924                ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
925
926 out:
927         class_export_cb_put(ccb->llcc_exp);
928         OBD_FREE_PTR(ccb);
929 }
930
931 /**
932  * Add commit callback function, it returns a non-zero value to inform
933  * caller to use sync transaction if necessary.
934  */
935 static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
936                                   struct obd_export *exp, __u64 transno)
937 {
938         struct tgt_last_committed_callback      *ccb;
939         struct dt_txn_commit_cb                 *dcb;
940         int                                      rc;
941
942         OBD_ALLOC_PTR(ccb);
943         if (ccb == NULL)
944                 return -ENOMEM;
945
946         ccb->llcc_tgt = tgt;
947         ccb->llcc_exp = class_export_cb_get(exp);
948         ccb->llcc_transno = transno;
949
950         dcb = &ccb->llcc_cb;
951         dcb->dcb_func = tgt_cb_last_committed;
952         INIT_LIST_HEAD(&dcb->dcb_linkage);
953         strlcpy(dcb->dcb_name, "tgt_cb_last_committed", sizeof(dcb->dcb_name));
954
955         rc = dt_trans_cb_add(th, dcb);
956         if (rc) {
957                 class_export_cb_put(exp);
958                 OBD_FREE_PTR(ccb);
959         }
960
961         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
962                 /* report failure to force synchronous operation */
963                 return -EPERM;
964
965         /* if exp_need_sync is set, return non-zero value to force
966          * a sync transaction. */
967         return rc ? rc : exp->exp_need_sync;
968 }
969
970 static int tgt_is_local_client(const struct lu_env *env,
971                                       struct obd_export *exp)
972 {
973         struct lu_target        *tgt = class_exp2tgt(exp);
974         struct tgt_session_info *tsi = tgt_ses_info(env);
975         struct ptlrpc_request   *req = tgt_ses_req(tsi);
976
977         if (exp_connect_flags(exp) & OBD_CONNECT_MDS ||
978             exp_connect_flags(exp) & OBD_CONNECT_MDS_MDS)
979                 return 0;
980         if (tgt->lut_local_recovery)
981                 return 0;
982         if (!req)
983                 return 0;
984         if (!LNetIsPeerLocal(req->rq_peer.nid))
985                 return 0;
986
987         return 1;
988 }
989
990 /**
991  * Add new client to the last_rcvd upon new connection.
992  *
993  * We use a bitmap to locate a free space in the last_rcvd file and initialize
994  * tg_export_data.
995  */
996 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
997 {
998         struct tg_export_data   *ted = &exp->exp_target_data;
999         struct lu_target        *tgt = class_exp2tgt(exp);
1000         int                      rc = 0, idx;
1001
1002         ENTRY;
1003
1004         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1005         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
1006                 RETURN(0);
1007
1008         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1009                 RETURN(0);
1010
1011         if (tgt_is_local_client(env, exp)) {
1012                 LCONSOLE_WARN("%s: local client %s w/o recovery\n",
1013                               exp->exp_obd->obd_name, ted->ted_lcd->lcd_uuid);
1014                 exp->exp_no_recovery = 1;
1015                 RETURN(0);
1016         }
1017
1018         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
1019          * there's no need for extra complication here
1020          */
1021         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
1022 repeat:
1023         if (idx >= LR_MAX_CLIENTS ||
1024             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
1025                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
1026                        tgt->lut_obd->obd_name,  idx);
1027                 RETURN(-EOVERFLOW);
1028         }
1029         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1030                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
1031                                              LR_MAX_CLIENTS, idx);
1032                 goto repeat;
1033         }
1034
1035         ted->ted_lr_idx = idx;
1036         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1037                           idx * tgt->lut_lsd.lsd_client_size;
1038
1039         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1040
1041         if (tgt_is_multimodrpcs_client(exp)) {
1042                 /* Set MULTI RPCS incompatibility flag to prevent previous
1043                  * Lustre versions to mount a target with reply_data file */
1044                 atomic_inc(&tgt->lut_num_clients);
1045                 if (!(tgt->lut_lsd.lsd_feature_incompat &
1046                       OBD_INCOMPAT_MULTI_RPCS)) {
1047                         tgt->lut_lsd.lsd_feature_incompat |=
1048                                                         OBD_INCOMPAT_MULTI_RPCS;
1049                         rc = tgt_server_data_update(env, tgt, 1);
1050                         if (rc < 0) {
1051                                 CERROR("%s: unable to set MULTI RPCS "
1052                                        "incompatibility flag\n",
1053                                        exp->exp_obd->obd_name);
1054                                 RETURN(rc);
1055                         }
1056                 }
1057
1058                 /* assign client slot generation */
1059                 ted->ted_lcd->lcd_generation =
1060                                 atomic_inc_return(&tgt->lut_client_generation);
1061         } else {
1062                 ted->ted_lcd->lcd_generation = 0;
1063         }
1064
1065         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s' "
1066                "generation %d\n",
1067                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1068                ted->ted_lcd->lcd_uuid, ted->ted_lcd->lcd_generation);
1069
1070         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
1071                 RETURN(-ENOSPC);
1072
1073         rc = tgt_client_data_update(env, exp);
1074         if (rc)
1075                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
1076                        tgt->lut_obd->obd_name, idx, rc);
1077
1078         RETURN(rc);
1079 }
1080 EXPORT_SYMBOL(tgt_client_new);
1081
1082 /* Add an existing client to the MDS in-memory state based on
1083  * a client that was previously found in the last_rcvd file and
1084  * already has an assigned slot (idx >= 0).
1085  *
1086  * It should not be possible to fail adding an existing client - otherwise
1087  * mdt_init_server_data() callsite needs to be fixed.
1088  */
1089 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
1090 {
1091         struct tg_export_data   *ted = &exp->exp_target_data;
1092         struct lu_target        *tgt = class_exp2tgt(exp);
1093
1094         ENTRY;
1095
1096         LASSERT(tgt && tgt->lut_client_bitmap != NULL);
1097         LASSERTF(idx >= 0, "%d\n", idx);
1098
1099         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
1100             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
1101                 RETURN(0);
1102
1103         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
1104                 CERROR("%s: client %d: bit already set in bitmap!!\n",
1105                        tgt->lut_obd->obd_name,  idx);
1106                 LBUG();
1107         }
1108         atomic_inc(&tgt->lut_num_clients);
1109
1110         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added, "
1111                "generation %d\n",
1112                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid,
1113                ted->ted_lcd->lcd_generation);
1114
1115         ted->ted_lr_idx = idx;
1116         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
1117                           idx * tgt->lut_lsd.lsd_client_size;
1118
1119         mutex_init(&ted->ted_lcd_lock);
1120
1121         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
1122
1123         RETURN(0);
1124 }
1125
1126 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
1127 {
1128         struct tg_export_data   *ted = &exp->exp_target_data;
1129         struct lu_target        *tgt = class_exp2tgt(exp);
1130         int                      rc;
1131
1132         ENTRY;
1133
1134         LASSERT(ted->ted_lcd);
1135
1136         if (unlikely(tgt == NULL)) {
1137                 CDEBUG(D_ERROR, "%s: No target for connected export\n",
1138                        class_exp2obd(exp)->obd_name);
1139                 RETURN(-EINVAL);
1140         }
1141
1142         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
1143         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
1144                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
1145             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT ||
1146             exp->exp_no_recovery)
1147                 RETURN(0);
1148
1149         /* Slot may be not yet assigned, use case is race between Client
1150          * reconnect and forced eviction */
1151         if (ted->ted_lr_idx < 0) {
1152                 CWARN("%s: client with UUID '%s' not in bitmap\n",
1153                       tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid);
1154                 RETURN(0);
1155         }
1156
1157         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
1158                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
1159                ted->ted_lcd->lcd_uuid);
1160
1161         /* Clear the bit _after_ zeroing out the client so we don't
1162            race with filter_client_add and zero out new clients.*/
1163         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
1164                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
1165                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
1166                 LBUG();
1167         }
1168
1169         /* Do not erase record for recoverable client. */
1170         if (exp->exp_flags & OBD_OPT_FAILOVER)
1171                 RETURN(0);
1172
1173         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_DEL))
1174                 RETURN(0);
1175
1176         /* Make sure the server's last_transno is up to date.
1177          * This should be done before zeroing client slot so last_transno will
1178          * be in server data or in client data in case of failure */
1179         rc = tgt_server_data_update(env, tgt, 0);
1180         if (rc != 0) {
1181                 CERROR("%s: failed to update server data, skip client %s "
1182                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
1183                        ted->ted_lcd->lcd_uuid, rc);
1184                 RETURN(rc);
1185         }
1186
1187         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
1188         rc = tgt_client_data_update(env, exp);
1189
1190         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
1191                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
1192                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
1193                ted->ted_lr_idx, ted->ted_lr_off, rc);
1194         RETURN(rc);
1195 }
1196 EXPORT_SYMBOL(tgt_client_del);
1197
1198 static void tgt_clean_by_tag(struct obd_export *exp, __u64 xid, __u16 tag)
1199 {
1200         struct tg_export_data   *ted = &exp->exp_target_data;
1201         struct lu_target        *lut = class_exp2tgt(exp);
1202         struct tg_reply_data    *trd, *tmp;
1203
1204         if (tag == 0)
1205                 return;
1206
1207         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
1208                 if (trd->trd_tag != tag)
1209                         continue;
1210
1211                 LASSERT(ergo(tgt_is_increasing_xid_client(exp),
1212                              trd->trd_reply.lrd_xid <= xid));
1213
1214                 ted->ted_release_tag++;
1215                 tgt_release_reply_data(lut, ted, trd);
1216         }
1217 }
1218
1219 static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
1220                        struct tg_export_data *ted, struct tg_reply_data *trd,
1221                        struct ptlrpc_request *req,
1222                        struct thandle *th, bool update_lrd_file)
1223 {
1224         struct lsd_reply_data   *lrd;
1225         int     i;
1226         int     rc;
1227
1228         lrd = &trd->trd_reply;
1229         /* update export last transno */
1230         mutex_lock(&ted->ted_lcd_lock);
1231         if (lrd->lrd_transno > ted->ted_lcd->lcd_last_transno)
1232                 ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
1233         mutex_unlock(&ted->ted_lcd_lock);
1234
1235         if (tgt != NULL) {
1236                 /* find a empty slot */
1237                 i = tgt_find_free_reply_slot(tgt);
1238                 if (unlikely(i < 0)) {
1239                         CERROR("%s: couldn't find a slot for reply data: "
1240                                "rc = %d\n", tgt_name(tgt), i);
1241                         RETURN(i);
1242                 }
1243                 trd->trd_index = i;
1244
1245                 if (update_lrd_file) {
1246                         loff_t  off;
1247
1248                         /* write reply data to disk */
1249                         off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
1250                         rc = tgt_reply_data_write(env, tgt, lrd, off, th);
1251                         if (unlikely(rc != 0)) {
1252                                 CERROR("%s: can't update %s file: rc = %d\n",
1253                                        tgt_name(tgt), REPLY_DATA, rc);
1254                                 GOTO(free_slot, rc);
1255                         }
1256                 }
1257         } else {
1258                 trd->trd_index = TRD_INDEX_MEMORY;
1259         }
1260
1261         /* add reply data to target export's reply list */
1262         mutex_lock(&ted->ted_lcd_lock);
1263         if (req != NULL) {
1264                 int exclude = tgt_is_increasing_xid_client(req->rq_export) ?
1265                               MSG_REPLAY : MSG_REPLAY|MSG_RESENT;
1266
1267                 if (req->rq_obsolete) {
1268                         CDEBUG(D_INFO,
1269                                "drop reply data update for obsolete req xid=%llu,"
1270                                "transno=%llu, tag=%hu\n", req->rq_xid,
1271                                lrd->lrd_transno, trd->trd_tag);
1272                         mutex_unlock(&ted->ted_lcd_lock);
1273                         GOTO(free_slot, rc = -EBADR);
1274                 }
1275
1276                 if (!(lustre_msg_get_flags(req->rq_reqmsg) & exclude))
1277                         tgt_clean_by_tag(req->rq_export, req->rq_xid,
1278                                          trd->trd_tag);
1279         }
1280         list_add(&trd->trd_list, &ted->ted_reply_list);
1281         ted->ted_reply_cnt++;
1282         if (ted->ted_reply_cnt > ted->ted_reply_max)
1283                 ted->ted_reply_max = ted->ted_reply_cnt;
1284         mutex_unlock(&ted->ted_lcd_lock);
1285
1286         CDEBUG(D_TRACE, "add reply %p: xid %llu, transno %llu, "
1287                "tag %hu, client gen %u, slot idx %d\n",
1288                trd, lrd->lrd_xid, lrd->lrd_transno,
1289                trd->trd_tag, lrd->lrd_client_gen, trd->trd_index);
1290
1291         RETURN(0);
1292
1293 free_slot:
1294         if (tgt != NULL)
1295                 tgt_clear_reply_slot(tgt, trd->trd_index);
1296         return rc;
1297 }
1298
1299 int tgt_mk_reply_data(const struct lu_env *env,
1300                       struct lu_target *tgt,
1301                       struct tg_export_data *ted,
1302                       struct ptlrpc_request *req,
1303                       __u64 opdata,
1304                       struct thandle *th,
1305                       bool write_update,
1306                       __u64 transno)
1307 {
1308         struct tg_reply_data    *trd;
1309         struct lsd_reply_data   *lrd;
1310         __u64                   *pre_versions = NULL;
1311         int                     rc;
1312
1313         OBD_ALLOC_PTR(trd);
1314         if (unlikely(trd == NULL))
1315                 RETURN(-ENOMEM);
1316
1317         /* fill reply data information */
1318         lrd = &trd->trd_reply;
1319         lrd->lrd_transno = transno;
1320         if (req != NULL) {
1321                 lrd->lrd_xid = req->rq_xid;
1322                 trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
1323                 lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
1324                 if (write_update) {
1325                         pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1326                         lrd->lrd_result = th->th_result;
1327                 }
1328         } else {
1329                 struct tgt_session_info *tsi;
1330
1331                 LASSERT(env != NULL);
1332                 tsi = tgt_ses_info(env);
1333                 LASSERT(tsi->tsi_xid != 0);
1334
1335                 lrd->lrd_xid = tsi->tsi_xid;
1336                 lrd->lrd_result = tsi->tsi_result;
1337                 lrd->lrd_client_gen = tsi->tsi_client_gen;
1338         }
1339
1340         lrd->lrd_data = opdata;
1341         if (pre_versions) {
1342                 trd->trd_pre_versions[0] = pre_versions[0];
1343                 trd->trd_pre_versions[1] = pre_versions[1];
1344                 trd->trd_pre_versions[2] = pre_versions[2];
1345                 trd->trd_pre_versions[3] = pre_versions[3];
1346         }
1347
1348         rc = tgt_add_reply_data(env, tgt, ted, trd, req,
1349                                 th, write_update);
1350         if (rc < 0) {
1351                 OBD_FREE_PTR(trd);
1352                 if (rc == -EBADR)
1353                         rc = 0;
1354         }
1355         return rc;
1356
1357 }
1358 EXPORT_SYMBOL(tgt_mk_reply_data);
1359
1360 /*
1361  * last_rcvd & last_committed update callbacks
1362  */
1363 static int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
1364                                 struct dt_object *obj, __u64 opdata,
1365                                 struct thandle *th, struct ptlrpc_request *req)
1366 {
1367         struct tgt_thread_info  *tti = tgt_th_info(env);
1368         struct tgt_session_info *tsi = tgt_ses_info(env);
1369         struct obd_export *exp = tsi->tsi_exp;
1370         struct tg_export_data *ted;
1371         __u64 *transno_p;
1372         bool nolcd = false;
1373         int rc = 0;
1374
1375         ENTRY;
1376
1377
1378         LASSERT(exp != NULL);
1379         ted = &exp->exp_target_data;
1380
1381         /* Some clients don't support recovery, and they don't have last_rcvd
1382          * client data:
1383          * 1. lightweight clients.
1384          * 2. local clients on MDS which doesn't enable "localrecov".
1385          * 3. OFD connect may cause transaction before export has last_rcvd
1386          *    slot.
1387          */
1388         if (ted->ted_lr_idx < 0)
1389                 nolcd = true;
1390
1391         if (req != NULL)
1392                 tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1393         else
1394                 /* From update replay, tti_transno should be set already */
1395                 LASSERT(tti->tti_transno != 0);
1396
1397         spin_lock(&tgt->lut_translock);
1398         if (th->th_result != 0) {
1399                 if (tti->tti_transno != 0) {
1400                         CERROR("%s: replay transno %llu failed: rc = %d\n",
1401                                tgt_name(tgt), tti->tti_transno, th->th_result);
1402                 }
1403         } else if (tti->tti_transno == 0) {
1404                 tti->tti_transno = ++tgt->lut_last_transno;
1405         } else {
1406                 /* should be replay */
1407                 if (tti->tti_transno > tgt->lut_last_transno)
1408                         tgt->lut_last_transno = tti->tti_transno;
1409         }
1410         spin_unlock(&tgt->lut_translock);
1411
1412         /** VBR: set new versions */
1413         if (th->th_result == 0 && obj != NULL) {
1414                 struct dt_object *dto = dt_object_locate(obj, th->th_dev);
1415                 dt_version_set(env, dto, tti->tti_transno, th);
1416         }
1417
1418         /* filling reply data */
1419         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1420                tti->tti_transno, tgt->lut_obd->obd_last_committed);
1421
1422         if (req != NULL) {
1423                 req->rq_transno = tti->tti_transno;
1424                 lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
1425         }
1426
1427         /* if can't add callback, do sync write */
1428         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp, tti->tti_transno);
1429
1430         if (nolcd) {
1431                 /* store transno in the last_rcvd header */
1432                 spin_lock(&tgt->lut_translock);
1433                 if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
1434                         tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
1435                         spin_unlock(&tgt->lut_translock);
1436                         /* Although current connection doesn't have slot
1437                          * in the last_rcvd, we still want to maintain
1438                          * the in-memory lsd_client_data structure in order to
1439                          * properly handle reply reconstruction. */
1440                         rc = tgt_server_data_write(env, tgt, th);
1441                 } else {
1442                         spin_unlock(&tgt->lut_translock);
1443                 }
1444         } else if (ted->ted_lr_off == 0) {
1445                 CERROR("%s: client idx %d has offset %lld\n",
1446                        tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
1447                 RETURN(-EINVAL);
1448         }
1449
1450         /* Target that supports multiple reply data */
1451         if (tgt_is_multimodrpcs_client(exp)) {
1452                 return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
1453                                          !!(req != NULL), tti->tti_transno);
1454         }
1455
1456         /* Enough for update replay, let's return */
1457         if (req == NULL)
1458                 RETURN(rc);
1459
1460         mutex_lock(&ted->ted_lcd_lock);
1461         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1462         if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE) {
1463                 transno_p = &ted->ted_lcd->lcd_last_close_transno;
1464                 ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
1465                 ted->ted_lcd->lcd_last_close_result = th->th_result;
1466         } else {
1467                 /* VBR: save versions in last_rcvd for reconstruct. */
1468                 __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
1469
1470                 if (pre_versions) {
1471                         ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
1472                         ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
1473                         ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
1474                         ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
1475                 }
1476                 transno_p = &ted->ted_lcd->lcd_last_transno;
1477                 ted->ted_lcd->lcd_last_xid = req->rq_xid;
1478                 ted->ted_lcd->lcd_last_result = th->th_result;
1479                 /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
1480                  * see struct ldlm_reply->lock_policy_res1; */
1481                 ted->ted_lcd->lcd_last_data = opdata;
1482         }
1483
1484         /* Update transno in slot only if non-zero number, i.e. no errors */
1485         if (likely(tti->tti_transno != 0)) {
1486                 /* Don't overwrite bigger transaction number with lower one.
1487                  * That is not sign of problem in all cases, but in any case
1488                  * this value should be monotonically increased only. */
1489                 if (*transno_p > tti->tti_transno) {
1490                         if (!tgt->lut_no_reconstruct) {
1491                                 CERROR("%s: trying to overwrite bigger transno:"
1492                                        "on-disk: %llu, new: %llu replay: "
1493                                        "%d. See LU-617.\n", tgt_name(tgt),
1494                                        *transno_p, tti->tti_transno,
1495                                        req_is_replay(req));
1496                                 if (req_is_replay(req)) {
1497                                         spin_lock(&req->rq_export->exp_lock);
1498                                         req->rq_export->exp_vbr_failed = 1;
1499                                         spin_unlock(&req->rq_export->exp_lock);
1500                                 }
1501                                 mutex_unlock(&ted->ted_lcd_lock);
1502                                 RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
1503                         }
1504                 } else {
1505                         *transno_p = tti->tti_transno;
1506                 }
1507         }
1508
1509         if (!nolcd) {
1510                 tti->tti_off = ted->ted_lr_off;
1511                 if (CFS_FAIL_CHECK(OBD_FAIL_TGT_RCVD_EIO))
1512                         rc = -EIO;
1513                 else
1514                         rc = tgt_client_data_write(env, tgt, ted->ted_lcd,
1515                                                    &tti->tti_off, th);
1516                 if (rc < 0) {
1517                         mutex_unlock(&ted->ted_lcd_lock);
1518                         RETURN(rc);
1519                 }
1520         }
1521         mutex_unlock(&ted->ted_lcd_lock);
1522         RETURN(rc);
1523 }
1524
1525 /*
1526  * last_rcvd update for echo client simulation.
1527  * It updates last_rcvd client slot and version of object in
1528  * simple way but with all locks to simulate all drawbacks
1529  */
1530 static int tgt_last_rcvd_update_echo(const struct lu_env *env,
1531                                      struct lu_target *tgt,
1532                                      struct dt_object *obj,
1533                                      struct thandle *th,
1534                                      struct obd_export *exp)
1535 {
1536         struct tgt_thread_info  *tti = tgt_th_info(env);
1537         struct tg_export_data   *ted = &exp->exp_target_data;
1538         int                      rc = 0;
1539
1540         ENTRY;
1541
1542         tti->tti_transno = 0;
1543
1544         spin_lock(&tgt->lut_translock);
1545         if (th->th_result == 0)
1546                 tti->tti_transno = ++tgt->lut_last_transno;
1547         spin_unlock(&tgt->lut_translock);
1548
1549         /** VBR: set new versions */
1550         if (th->th_result == 0 && obj != NULL)
1551                 dt_version_set(env, obj, tti->tti_transno, th);
1552
1553         /* if can't add callback, do sync write */
1554         th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
1555                                                 tti->tti_transno);
1556
1557         LASSERT(ted->ted_lr_off > 0);
1558
1559         mutex_lock(&ted->ted_lcd_lock);
1560         LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
1561         ted->ted_lcd->lcd_last_transno = tti->tti_transno;
1562         ted->ted_lcd->lcd_last_result = th->th_result;
1563
1564         tti->tti_off = ted->ted_lr_off;
1565         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
1566         mutex_unlock(&ted->ted_lcd_lock);
1567         RETURN(rc);
1568 }
1569
1570 static int tgt_clients_data_init(const struct lu_env *env,
1571                                  struct lu_target *tgt,
1572                                  unsigned long last_size)
1573 {
1574         struct obd_device       *obd = tgt->lut_obd;
1575         struct lr_server_data   *lsd = &tgt->lut_lsd;
1576         struct lsd_client_data  *lcd = NULL;
1577         struct tg_export_data   *ted;
1578         int                      cl_idx;
1579         int                      rc = 0;
1580         loff_t                   off = lsd->lsd_client_start;
1581         __u32                    generation = 0;
1582         struct cfs_hash         *hash = NULL;
1583
1584         ENTRY;
1585
1586         if (tgt->lut_bottom->dd_rdonly)
1587                 RETURN(0);
1588
1589         BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
1590                      sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
1591
1592         OBD_ALLOC_PTR(lcd);
1593         if (lcd == NULL)
1594                 RETURN(-ENOMEM);
1595
1596         hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
1597         if (hash == NULL)
1598                 GOTO(err_out, rc = -ENODEV);
1599
1600         for (cl_idx = 0; off < last_size; cl_idx++) {
1601                 struct obd_export       *exp;
1602                 __u64                    last_transno;
1603
1604                 /* Don't assume off is incremented properly by
1605                  * read_record(), in case sizeof(*lcd)
1606                  * isn't the same as fsd->lsd_client_size.  */
1607                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
1608                 rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
1609                 if (rc) {
1610                         CERROR("%s: error reading last_rcvd %s idx %d off "
1611                                "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
1612                                cl_idx, off, rc);
1613                         rc = 0;
1614                         break; /* read error shouldn't cause startup to fail */
1615                 }
1616
1617                 if (lcd->lcd_uuid[0] == '\0') {
1618                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
1619                                cl_idx);
1620                         continue;
1621                 }
1622
1623                 last_transno = lcd_last_transno(lcd);
1624
1625                 /* These exports are cleaned up by disconnect, so they
1626                  * need to be set up like real exports as connect does.
1627                  */
1628                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
1629                        " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
1630                        cl_idx, last_transno, lsd->lsd_last_transno,
1631                        lcd_last_xid(lcd), lcd->lcd_generation);
1632
1633                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
1634                 if (IS_ERR(exp)) {
1635                         if (PTR_ERR(exp) == -EALREADY) {
1636                                 /* export already exists, zero out this one */
1637                                 CERROR("%s: Duplicate export %s!\n",
1638                                        tgt_name(tgt), lcd->lcd_uuid);
1639                                 continue;
1640                         }
1641                         GOTO(err_out, rc = PTR_ERR(exp));
1642                 }
1643
1644                 ted = &exp->exp_target_data;
1645                 *ted->ted_lcd = *lcd;
1646
1647                 rc = tgt_client_add(env, exp, cl_idx);
1648                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
1649                 /* VBR: set export last committed version */
1650                 exp->exp_last_committed = last_transno;
1651                 spin_lock(&exp->exp_lock);
1652                 exp->exp_connecting = 0;
1653                 exp->exp_in_recovery = 0;
1654                 spin_unlock(&exp->exp_lock);
1655                 atomic_inc(&obd->obd_max_recoverable_clients);
1656
1657                 if (tgt->lut_lsd.lsd_feature_incompat &
1658                     OBD_INCOMPAT_MULTI_RPCS &&
1659                     lcd->lcd_generation != 0) {
1660                         /* compute the highest valid client generation */
1661                         generation = max(generation, lcd->lcd_generation);
1662                         /* fill client_generation <-> export hash table */
1663                         rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
1664                                                  &exp->exp_gen_hash);
1665                         if (rc != 0) {
1666                                 CERROR("%s: duplicate export for client "
1667                                        "generation %u\n",
1668                                        tgt_name(tgt), lcd->lcd_generation);
1669                                 class_export_put(exp);
1670                                 GOTO(err_out, rc);
1671                         }
1672                 }
1673
1674                 class_export_put(exp);
1675
1676                 rc = rev_import_init(exp);
1677                 if (rc != 0) {
1678                         class_unlink_export(exp);
1679                         GOTO(err_out, rc);
1680                 }
1681
1682                 /* Need to check last_rcvd even for duplicated exports. */
1683                 CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
1684                        cl_idx, last_transno);
1685
1686                 spin_lock(&tgt->lut_translock);
1687                 tgt->lut_last_transno = max(last_transno,
1688                                             tgt->lut_last_transno);
1689                 spin_unlock(&tgt->lut_translock);
1690         }
1691
1692         /* record highest valid client generation */
1693         atomic_set(&tgt->lut_client_generation, generation);
1694
1695 err_out:
1696         if (hash != NULL)
1697                 cfs_hash_putref(hash);
1698         OBD_FREE_PTR(lcd);
1699         RETURN(rc);
1700 }
1701
1702 struct server_compat_data {
1703         __u32 rocompat;
1704         __u32 incompat;
1705         __u32 rocinit;
1706         __u32 incinit;
1707 };
1708
1709 static struct server_compat_data tgt_scd[] = {
1710         [LDD_F_SV_TYPE_MDT] = {
1711                 .rocompat = OBD_ROCOMPAT_LOVOBJID,
1712                 .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1713                             OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
1714                             OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
1715                             OBD_INCOMPAT_MULTI_RPCS,
1716                 .rocinit = OBD_ROCOMPAT_LOVOBJID,
1717                 .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
1718                            OBD_INCOMPAT_MULTI_OI,
1719         },
1720         [LDD_F_SV_TYPE_OST] = {
1721                 .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
1722                 .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
1723                             OBD_INCOMPAT_FID,
1724                 .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
1725                 .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
1726         }
1727 };
1728
1729 int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
1730 {
1731         struct tgt_thread_info          *tti = tgt_th_info(env);
1732         struct lr_server_data           *lsd = &tgt->lut_lsd;
1733         unsigned long                    last_rcvd_size;
1734         __u32                            index;
1735         int                              rc, type;
1736
1737         rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
1738         if (rc)
1739                 RETURN(rc);
1740
1741         last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
1742
1743         /* ensure padding in the struct is the correct size */
1744         BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
1745                      sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
1746
1747         rc = server_name2index(tgt_name(tgt), &index, NULL);
1748         if (rc < 0) {
1749                 CERROR("%s: Can not get index from name: rc = %d\n",
1750                        tgt_name(tgt), rc);
1751                 RETURN(rc);
1752         }
1753         /* server_name2index() returns type */
1754         type = rc;
1755         if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
1756                 CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
1757                 RETURN(-EINVAL);
1758         }
1759
1760         /* last_rcvd on OST doesn't provide reconstruct support because there
1761          * may be up to 8 in-flight write requests per single slot in
1762          * last_rcvd client data
1763          */
1764         tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
1765
1766         if (last_rcvd_size == 0) {
1767                 LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
1768
1769                 memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
1770                        sizeof(lsd->lsd_uuid));
1771                 lsd->lsd_last_transno = 0;
1772                 lsd->lsd_mount_count = 0;
1773                 lsd->lsd_server_size = LR_SERVER_SIZE;
1774                 lsd->lsd_client_start = LR_CLIENT_START;
1775                 lsd->lsd_client_size = LR_CLIENT_SIZE;
1776                 lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
1777                 lsd->lsd_osd_index = index;
1778                 lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
1779                 lsd->lsd_feature_incompat = tgt_scd[type].incinit;
1780         } else {
1781                 rc = tgt_server_data_read(env, tgt);
1782                 if (rc) {
1783                         CERROR("%s: error reading LAST_RCVD: rc= %d\n",
1784                                tgt_name(tgt), rc);
1785                         RETURN(rc);
1786                 }
1787                 if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
1788                         if (tgt->lut_bottom->dd_rdonly) {
1789                                 /* Such difference may be caused by mounting
1790                                  * up snapshot with new fsname under rd_only
1791                                  * mode. But even if it was NOT, it will not
1792                                  * damage the system because of "rd_only". */
1793                                 memcpy(lsd->lsd_uuid,
1794                                        tgt->lut_obd->obd_uuid.uuid,
1795                                        sizeof(lsd->lsd_uuid));
1796                         } else {
1797                                 LCONSOLE_ERROR_MSG(0x157, "Trying to start "
1798                                                    "OBD %s using the wrong "
1799                                                    "disk %s. Were the /dev/ "
1800                                                    "assignments rearranged?\n",
1801                                                    tgt->lut_obd->obd_uuid.uuid,
1802                                                    lsd->lsd_uuid);
1803                                 RETURN(-EINVAL);
1804                         }
1805                 }
1806
1807                 if (lsd->lsd_osd_index != index) {
1808                         LCONSOLE_ERROR_MSG(0x157,
1809                                            "%s: index %d in last rcvd is different with the index %d in config log, It might be disk corruption!\n",
1810                                            tgt_name(tgt),
1811                                            lsd->lsd_osd_index, index);
1812                         RETURN(-EINVAL);
1813                 }
1814         }
1815
1816         if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
1817                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
1818                        tgt_name(tgt),
1819                        lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
1820                 RETURN(-EINVAL);
1821         }
1822
1823         if (type == LDD_F_SV_TYPE_MDT)
1824                 lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
1825
1826         if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
1827                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
1828                        tgt_name(tgt),
1829                        lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
1830                 RETURN(-EINVAL);
1831         }
1832         /** Interop: evict all clients at first boot with 1.8 last_rcvd */
1833         if (type == LDD_F_SV_TYPE_MDT &&
1834             !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
1835                 if (last_rcvd_size > lsd->lsd_client_start) {
1836                         LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
1837                                       "remove all clients for interop needs\n",
1838                                       tgt_name(tgt));
1839                         rc = tgt_truncate_last_rcvd(env, tgt,
1840                                                     lsd->lsd_client_start);
1841                         if (rc)
1842                                 RETURN(rc);
1843                         last_rcvd_size = lsd->lsd_client_start;
1844                 }
1845                 /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
1846                 lsd->lsd_feature_compat |= OBD_COMPAT_20;
1847         }
1848
1849         spin_lock(&tgt->lut_translock);
1850         tgt->lut_last_transno = lsd->lsd_last_transno;
1851         spin_unlock(&tgt->lut_translock);
1852
1853         lsd->lsd_mount_count++;
1854
1855         CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
1856         CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
1857                tgt_name(tgt), tgt->lut_last_transno);
1858         CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
1859                tgt_name(tgt), lsd->lsd_mount_count);
1860         CDEBUG(D_INODE, "%s: server data size: %u\n",
1861                tgt_name(tgt), lsd->lsd_server_size);
1862         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
1863                tgt_name(tgt), lsd->lsd_client_start);
1864         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
1865                tgt_name(tgt), lsd->lsd_client_size);
1866         CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
1867                tgt_name(tgt), last_rcvd_size);
1868         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
1869                tgt_name(tgt), lsd->lsd_subdir_count);
1870         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
1871                last_rcvd_size <= lsd->lsd_client_start ? 0 :
1872                (last_rcvd_size - lsd->lsd_client_start) /
1873                 lsd->lsd_client_size);
1874         CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
1875
1876         if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
1877             lsd->lsd_client_size == 0) {
1878                 CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
1879                 RETURN(-EINVAL);
1880         }
1881
1882         if (!tgt->lut_obd->obd_replayable)
1883                 CWARN("%s: recovery support OFF\n", tgt_name(tgt));
1884
1885         rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
1886         if (rc < 0)
1887                 GOTO(err_client, rc);
1888
1889         spin_lock(&tgt->lut_translock);
1890         /* obd_last_committed is used for compatibility
1891          * with other lustre recovery code */
1892         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
1893         spin_unlock(&tgt->lut_translock);
1894
1895         tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
1896         tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
1897
1898         /* save it, so mount count and last_transno is current */
1899         rc = tgt_server_data_update(env, tgt, 0);
1900         if (rc < 0)
1901                 GOTO(err_client, rc);
1902
1903         RETURN(0);
1904
1905 err_client:
1906         class_disconnect_exports(tgt->lut_obd);
1907         return rc;
1908 }
1909
1910 /* add credits for last_rcvd update */
1911 int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
1912                      void *cookie)
1913 {
1914         struct lu_target        *tgt = cookie;
1915         struct tgt_session_info *tsi;
1916         struct tgt_thread_info  *tti = tgt_th_info(env);
1917         struct dt_object        *dto;
1918         int                      rc;
1919
1920         /* For readonly case, the caller should have got failure
1921          * when start the transaction. If the logic comes here,
1922          * there must be something wrong. */
1923         if (unlikely(tgt->lut_bottom->dd_rdonly)) {
1924                 dump_stack();
1925                 LBUG();
1926         }
1927
1928         /* if there is no session, then this transaction is not result of
1929          * request processing but some local operation */
1930         if (env->le_ses == NULL)
1931                 return 0;
1932
1933         LASSERT(tgt->lut_last_rcvd);
1934         tsi = tgt_ses_info(env);
1935         /* OFD may start transaction without export assigned */
1936         if (tsi->tsi_exp == NULL)
1937                 return 0;
1938
1939         if (tgt_is_multimodrpcs_client(tsi->tsi_exp)) {
1940                 /*
1941                  * Use maximum possible file offset for declaration to ensure
1942                  * ZFS will reserve enough credits for a write anywhere in this
1943                  * file, since we don't know where in the file the write will be
1944                  * because a replay slot has not been assigned.  This should be
1945                  * replaced by dmu_tx_hold_append() when available.
1946                  */
1947                 tti->tti_buf.lb_buf = NULL;
1948                 tti->tti_buf.lb_len = sizeof(struct lsd_reply_data);
1949                 dto = dt_object_locate(tgt->lut_reply_data, th->th_dev);
1950                 rc = dt_declare_record_write(env, dto, &tti->tti_buf, -1, th);
1951                 if (rc)
1952                         return rc;
1953         } else {
1954                 dto = dt_object_locate(tgt->lut_last_rcvd, th->th_dev);
1955                 tti_buf_lcd(tti);
1956                 tti->tti_off = tsi->tsi_exp->exp_target_data.ted_lr_off;
1957                 rc = dt_declare_record_write(env, dto, &tti->tti_buf,
1958                                              tti->tti_off, th);
1959                 if (rc)
1960                         return rc;
1961         }
1962
1963         if (tsi->tsi_vbr_obj != NULL &&
1964             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
1965                 dto = dt_object_locate(tsi->tsi_vbr_obj, th->th_dev);
1966                 rc = dt_declare_version_set(env, dto, th);
1967         }
1968
1969         return rc;
1970 }
1971
1972 /* Update last_rcvd records with latests transaction data */
1973 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
1974                     void *cookie)
1975 {
1976         struct lu_target        *tgt = cookie;
1977         struct tgt_session_info *tsi;
1978         struct tgt_thread_info  *tti = tgt_th_info(env);
1979         struct dt_object        *obj = NULL;
1980         int                      rc;
1981         bool                     echo_client;
1982
1983         if (env->le_ses == NULL)
1984                 return 0;
1985
1986         tsi = tgt_ses_info(env);
1987         /* OFD may start transaction without export assigned */
1988         if (tsi->tsi_exp == NULL)
1989                 return 0;
1990
1991         echo_client = (tgt_ses_req(tsi) == NULL && tsi->tsi_xid == 0);
1992
1993         if (tti->tti_has_trans && !echo_client) {
1994                 if (tti->tti_mult_trans == 0) {
1995                         CDEBUG(D_HA, "More than one transaction %llu\n",
1996                                tti->tti_transno);
1997                         RETURN(0);
1998                 }
1999                 /* we need another transno to be assigned */
2000                 tti->tti_transno = 0;
2001         } else if (th->th_result == 0) {
2002                 tti->tti_has_trans = 1;
2003         }
2004
2005         if (tsi->tsi_vbr_obj != NULL &&
2006             !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
2007                 obj = tsi->tsi_vbr_obj;
2008         }
2009
2010         if (unlikely(echo_client)) /* echo client special case */
2011                 rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
2012                                                tsi->tsi_exp);
2013         else
2014                 rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
2015                                           tgt_ses_req(tsi));
2016         return rc;
2017 }
2018
2019 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt)
2020 {
2021         struct tgt_thread_info  *tti = tgt_th_info(env);
2022         struct lsd_reply_data   *lrd = &tti->tti_lrd;
2023         unsigned long            reply_data_size;
2024         int                      rc;
2025         struct lsd_reply_header *lrh = NULL;
2026         struct tg_reply_data    *trd = NULL;
2027         int                      idx;
2028         loff_t                   off;
2029         struct cfs_hash         *hash = NULL;
2030         struct obd_export       *exp;
2031         struct tg_export_data   *ted;
2032         int                      reply_data_recovered = 0;
2033
2034         rc = dt_attr_get(env, tgt->lut_reply_data, &tti->tti_attr);
2035         if (rc)
2036                 GOTO(out, rc);
2037         reply_data_size = (unsigned long)tti->tti_attr.la_size;
2038
2039         OBD_ALLOC_PTR(lrh);
2040         if (lrh == NULL)
2041                 GOTO(out, rc = -ENOMEM);
2042
2043         if (reply_data_size == 0) {
2044                 CDEBUG(D_INFO, "%s: new reply_data file, initializing\n",
2045                        tgt_name(tgt));
2046                 lrh->lrh_magic = LRH_MAGIC;
2047                 lrh->lrh_header_size = sizeof(struct lsd_reply_header);
2048                 lrh->lrh_reply_size = sizeof(struct lsd_reply_data);
2049                 rc = tgt_reply_header_write(env, tgt, lrh);
2050                 if (rc) {
2051                         CERROR("%s: error writing %s: rc = %d\n",
2052                                tgt_name(tgt), REPLY_DATA, rc);
2053                         GOTO(out, rc);
2054                 }
2055         } else {
2056                 rc = tgt_reply_header_read(env, tgt, lrh);
2057                 if (rc) {
2058                         CERROR("%s: error reading %s: rc = %d\n",
2059                                tgt_name(tgt), REPLY_DATA, rc);
2060                         GOTO(out, rc);
2061                 }
2062                 if (lrh->lrh_magic != LRH_MAGIC ||
2063                     lrh->lrh_header_size != sizeof(struct lsd_reply_header) ||
2064                     lrh->lrh_reply_size != sizeof(struct lsd_reply_data)) {
2065                         CERROR("%s: invalid header in %s\n",
2066                                tgt_name(tgt), REPLY_DATA);
2067                         GOTO(out, rc = -EINVAL);
2068                 }
2069
2070                 hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
2071                 if (hash == NULL)
2072                         GOTO(out, rc = -ENODEV);
2073
2074                 OBD_ALLOC_PTR(trd);
2075                 if (trd == NULL)
2076                         GOTO(out, rc = -ENOMEM);
2077
2078                 /* Load reply_data from disk */
2079                 for (idx = 0, off = sizeof(struct lsd_reply_header);
2080                      off < reply_data_size;
2081                      idx++, off += sizeof(struct lsd_reply_data)) {
2082                         rc = tgt_reply_data_read(env, tgt, lrd, off);
2083                         if (rc) {
2084                                 CERROR("%s: error reading %s: rc = %d\n",
2085                                        tgt_name(tgt), REPLY_DATA, rc);
2086                                 GOTO(out, rc);
2087                         }
2088
2089                         exp = cfs_hash_lookup(hash, &lrd->lrd_client_gen);
2090                         if (exp == NULL) {
2091                                 /* old reply data from a disconnected client */
2092                                 continue;
2093                         }
2094                         ted = &exp->exp_target_data;
2095                         mutex_lock(&ted->ted_lcd_lock);
2096
2097                         /* create in-memory reply_data and link it to
2098                          * target export's reply list */
2099                         rc = tgt_set_reply_slot(tgt, idx);
2100                         if (rc != 0) {
2101                                 mutex_unlock(&ted->ted_lcd_lock);
2102                                 GOTO(out, rc);
2103                         }
2104                         trd->trd_reply = *lrd;
2105                         trd->trd_pre_versions[0] = 0;
2106                         trd->trd_pre_versions[1] = 0;
2107                         trd->trd_pre_versions[2] = 0;
2108                         trd->trd_pre_versions[3] = 0;
2109                         trd->trd_index = idx;
2110                         trd->trd_tag = 0;
2111                         list_add(&trd->trd_list, &ted->ted_reply_list);
2112                         ted->ted_reply_cnt++;
2113                         if (ted->ted_reply_cnt > ted->ted_reply_max)
2114                                 ted->ted_reply_max = ted->ted_reply_cnt;
2115
2116                         CDEBUG(D_HA, "%s: restore reply %p: xid %llu, "
2117                                "transno %llu, client gen %u, slot idx %d\n",
2118                                tgt_name(tgt), trd, lrd->lrd_xid,
2119                                lrd->lrd_transno, lrd->lrd_client_gen,
2120                                trd->trd_index);
2121
2122                         /* update export last committed transation */
2123                         exp->exp_last_committed = max(exp->exp_last_committed,
2124                                                       lrd->lrd_transno);
2125                         /* Update lcd_last_transno as well for check in
2126                          * tgt_release_reply_data() or the latest client
2127                          * transno can be lost.
2128                          */
2129                         ted->ted_lcd->lcd_last_transno =
2130                                 max(ted->ted_lcd->lcd_last_transno,
2131                                     exp->exp_last_committed);
2132
2133                         mutex_unlock(&ted->ted_lcd_lock);
2134                         class_export_put(exp);
2135
2136                         /* update target last committed transaction */
2137                         spin_lock(&tgt->lut_translock);
2138                         tgt->lut_last_transno = max(tgt->lut_last_transno,
2139                                                     lrd->lrd_transno);
2140                         spin_unlock(&tgt->lut_translock);
2141
2142                         reply_data_recovered++;
2143
2144                         OBD_ALLOC_PTR(trd);
2145                         if (trd == NULL)
2146                                 GOTO(out, rc = -ENOMEM);
2147                 }
2148                 CDEBUG(D_INFO, "%s: %d reply data have been recovered\n",
2149                        tgt_name(tgt), reply_data_recovered);
2150         }
2151
2152         spin_lock(&tgt->lut_translock);
2153         /* obd_last_committed is used for compatibility
2154          * with other lustre recovery code */
2155         tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
2156         spin_unlock(&tgt->lut_translock);
2157
2158         rc = 0;
2159
2160 out:
2161         if (hash != NULL)
2162                 cfs_hash_putref(hash);
2163         if (trd != NULL)
2164                 OBD_FREE_PTR(trd);
2165         if (lrh != NULL)
2166                 OBD_FREE_PTR(lrh);
2167         return rc;
2168 }
2169
2170 static int tgt_check_lookup_req(struct ptlrpc_request *req, int lookup,
2171                                 struct tg_reply_data *trd)
2172 {
2173         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2174         struct lu_target *lut = class_exp2tgt(req->rq_export);
2175         __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
2176         int rc = 0;
2177         struct tg_reply_data *reply;
2178         bool check_increasing;
2179
2180         if (tag == 0)
2181                 return 0;
2182
2183         check_increasing = tgt_is_increasing_xid_client(req->rq_export) &&
2184                            !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2185         if (!lookup && !check_increasing)
2186                 return 0;
2187
2188         list_for_each_entry(reply, &ted->ted_reply_list, trd_list) {
2189                 if (lookup && reply->trd_reply.lrd_xid == req->rq_xid) {
2190                         rc = 1;
2191                         if (trd != NULL)
2192                                 *trd = *reply;
2193                         break;
2194                 } else if (check_increasing && reply->trd_tag == tag &&
2195                            reply->trd_reply.lrd_xid > req->rq_xid) {
2196                         rc = -EPROTO;
2197                         CERROR("%s: busy tag=%u req_xid=%llu, trd=%p: xid=%llu transno=%llu client_gen=%u slot_idx=%d: rc = %d\n",
2198                                tgt_name(lut), tag, req->rq_xid, trd,
2199                                reply->trd_reply.lrd_xid,
2200                                reply->trd_reply.lrd_transno,
2201                                reply->trd_reply.lrd_client_gen,
2202                                reply->trd_index, rc);
2203                         break;
2204                 }
2205         }
2206
2207         return rc;
2208 }
2209
2210 /* Look for a reply data matching specified request @req
2211  * A copy is returned in @trd if the pointer is not NULL
2212  */
2213 int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd)
2214 {
2215         struct tg_export_data *ted = &req->rq_export->exp_target_data;
2216         int found = 0;
2217         bool not_replay = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY);
2218
2219         mutex_lock(&ted->ted_lcd_lock);
2220         if (not_replay && req->rq_xid <= req->rq_export->exp_last_xid) {
2221                 /* A check for the last_xid is needed here in case there is
2222                  * no reply data is left in the list. It may happen if another
2223                  * RPC on another slot increased the last_xid between our
2224                  * process_req_last_xid & tgt_lookup_reply calls */
2225                 found = -EPROTO;
2226         } else {
2227                 found = tgt_check_lookup_req(req, 1, trd);
2228         }
2229         mutex_unlock(&ted->ted_lcd_lock);
2230
2231         CDEBUG(D_TRACE, "%s: lookup reply xid %llu, found %d last_xid %llu\n",
2232                tgt_name(class_exp2tgt(req->rq_export)), req->rq_xid, found,
2233                req->rq_export->exp_last_xid);
2234
2235         return found;
2236 }
2237 EXPORT_SYMBOL(tgt_lookup_reply);
2238
2239 int tgt_handle_received_xid(struct obd_export *exp, __u64 rcvd_xid)
2240 {
2241         struct tg_export_data   *ted = &exp->exp_target_data;
2242         struct lu_target        *lut = class_exp2tgt(exp);
2243         struct tg_reply_data    *trd, *tmp;
2244
2245
2246         list_for_each_entry_safe(trd, tmp, &ted->ted_reply_list, trd_list) {
2247                 if (trd->trd_reply.lrd_xid > rcvd_xid)
2248                         continue;
2249                 ted->ted_release_xid++;
2250                 tgt_release_reply_data(lut, ted, trd);
2251         }
2252
2253         return 0;
2254 }
2255
2256 int tgt_handle_tag(struct ptlrpc_request *req)
2257 {
2258         return tgt_check_lookup_req(req, 0, NULL);
2259 }
2260