Whamcloud - gitweb
LU-2955 tests: make replay-ost-single/8b SLOW for ZFS
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Unified Target
37  * These are common function to work with last_received file
38  *
39  * Author: Mikhail Pershin <mike.pershin@intel.com>
40  */
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <lustre_fid.h>
44
45 #include "tgt_internal.h"
46
47 static inline struct lu_buf *tti_buf_lsd(struct tgt_thread_info *tti)
48 {
49         tti->tti_buf.lb_buf = &tti->tti_lsd;
50         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
51         return &tti->tti_buf;
52 }
53
54 static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
55 {
56         tti->tti_buf.lb_buf = &tti->tti_lcd;
57         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
58         return &tti->tti_buf;
59 }
60
61 /**
62  * Allocate in-memory data for client slot related to export.
63  */
64 int tgt_client_alloc(struct obd_export *exp)
65 {
66         ENTRY;
67         LASSERT(exp != exp->exp_obd->obd_self_export);
68
69         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
70         if (exp->exp_target_data.ted_lcd == NULL)
71                 RETURN(-ENOMEM);
72         /* Mark that slot is not yet valid, 0 doesn't work here */
73         exp->exp_target_data.ted_lr_idx = -1;
74         RETURN(0);
75 }
76 EXPORT_SYMBOL(tgt_client_alloc);
77
78 /**
79  * Free in-memory data for client slot related to export.
80  */
81 void tgt_client_free(struct obd_export *exp)
82 {
83         struct tg_export_data   *ted = &exp->exp_target_data;
84         struct lu_target        *lut = class_exp2tgt(exp);
85
86         LASSERT(exp != exp->exp_obd->obd_self_export);
87
88         OBD_FREE_PTR(ted->ted_lcd);
89         ted->ted_lcd = NULL;
90
91         /* Slot may be not yet assigned */
92         if (ted->ted_lr_idx < 0)
93                 return;
94         /* Clear bit when lcd is freed */
95         LASSERT(lut->lut_client_bitmap);
96         if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
97                 CERROR("%s: client %u bit already clear in bitmap\n",
98                        exp->exp_obd->obd_name, ted->ted_lr_idx);
99                 LBUG();
100         }
101 }
102 EXPORT_SYMBOL(tgt_client_free);
103
104 int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
105                          struct lsd_client_data *lcd, loff_t *off, int index)
106 {
107         struct tgt_thread_info  *tti = tgt_th_info(env);
108         int                      rc;
109
110         tti_buf_lcd(tti);
111         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
112         if (rc == 0) {
113                 check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
114                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
115         }
116
117         CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
118                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
119                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
120                "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
121                *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
122                lcd->lcd_last_result, lcd->lcd_last_data,
123                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
124                lcd->lcd_last_close_result, rc);
125         return rc;
126 }
127 EXPORT_SYMBOL(tgt_client_data_read);
128
129 int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
130                           struct lsd_client_data *lcd, loff_t *off,
131                           struct thandle *th)
132 {
133         struct tgt_thread_info *tti = tgt_th_info(env);
134
135         lcd_cpu_to_le(lcd, &tti->tti_lcd);
136         tti_buf_lcd(tti);
137
138         return dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf, off, th);
139 }
140 EXPORT_SYMBOL(tgt_client_data_write);
141
142 /**
143  * Update client data in last_rcvd
144  */
145 int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
146 {
147         struct tg_export_data   *ted = &exp->exp_target_data;
148         struct lu_target        *tgt = class_exp2tgt(exp);
149         struct tgt_thread_info  *tti = tgt_th_info(env);
150         struct thandle          *th;
151         int                      rc = 0;
152
153         ENTRY;
154
155         th = dt_trans_create(env, tgt->lut_bottom);
156         if (IS_ERR(th))
157                 RETURN(PTR_ERR(th));
158
159         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
160                                      sizeof(struct lsd_client_data),
161                                      ted->ted_lr_off, th);
162         if (rc)
163                 GOTO(out, rc);
164
165         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
166         if (rc)
167                 GOTO(out, rc);
168         /*
169          * Until this operations will be committed the sync is needed
170          * for this export. This should be done _after_ starting the
171          * transaction so that many connecting clients will not bring
172          * server down with lots of sync writes.
173          */
174         rc = tgt_new_client_cb_add(th, exp);
175         if (rc) {
176                 /* can't add callback, do sync now */
177                 th->th_sync = 1;
178         } else {
179                 spin_lock(&exp->exp_lock);
180                 exp->exp_need_sync = 1;
181                 spin_unlock(&exp->exp_lock);
182         }
183
184         tti->tti_off = ted->ted_lr_off;
185         rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
186         EXIT;
187 out:
188         dt_trans_stop(env, tgt->lut_bottom, th);
189         CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
190                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
191                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
192
193         return rc;
194 }
195
196 int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
197 {
198         struct tgt_thread_info  *tti = tgt_th_info(env);
199         int                      rc;
200
201         tti->tti_off = 0;
202         tti_buf_lsd(tti);
203         rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
204                             &tti->tti_off);
205         if (rc == 0)
206                 lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
207
208         CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
209                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
210                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
211         return rc;
212 }
213 EXPORT_SYMBOL(tgt_server_data_read);
214
215 int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
216                           struct thandle *th)
217 {
218         struct tgt_thread_info  *tti = tgt_th_info(env);
219         int                      rc;
220
221         ENTRY;
222
223         tti->tti_off = 0;
224         tti_buf_lsd(tti);
225         lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
226
227         rc = dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf,
228                              &tti->tti_off, th);
229
230         CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
231                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
232                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
233
234         RETURN(rc);
235 }
236 EXPORT_SYMBOL(tgt_server_data_write);
237
238 /**
239  * Update server data in last_rcvd
240  */
241 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
242                            int sync)
243 {
244         struct tgt_thread_info  *tti = tgt_th_info(env);
245         struct thandle          *th;
246         int                      rc = 0;
247
248         ENTRY;
249
250         CDEBUG(D_SUPER,
251                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
252                tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
253                tgt->lut_last_transno);
254
255         /* Always save latest transno to keep it fresh */
256         spin_lock(&tgt->lut_translock);
257         tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
258         spin_unlock(&tgt->lut_translock);
259
260         th = dt_trans_create(env, tgt->lut_bottom);
261         if (IS_ERR(th))
262                 RETURN(PTR_ERR(th));
263
264         th->th_sync = sync;
265
266         rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
267                                      sizeof(struct lr_server_data),
268                                      tti->tti_off, th);
269         if (rc)
270                 GOTO(out, rc);
271
272         rc = dt_trans_start(env, tgt->lut_bottom, th);
273         if (rc)
274                 GOTO(out, rc);
275
276         rc = tgt_server_data_write(env, tgt, th);
277 out:
278         dt_trans_stop(env, tgt->lut_bottom, th);
279
280         CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
281                "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
282                tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
283         RETURN(rc);
284 }
285 EXPORT_SYMBOL(tgt_server_data_update);
286
287 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
288                            loff_t size)
289 {
290         struct dt_object *dt = tgt->lut_last_rcvd;
291         struct thandle   *th;
292         struct lu_attr    attr;
293         int               rc;
294
295         ENTRY;
296
297         attr.la_size = size;
298         attr.la_valid = LA_SIZE;
299
300         th = dt_trans_create(env, tgt->lut_bottom);
301         if (IS_ERR(th))
302                 RETURN(PTR_ERR(th));
303         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
304         if (rc)
305                 GOTO(cleanup, rc);
306         rc = dt_declare_attr_set(env, dt, &attr, th);
307         if (rc)
308                 GOTO(cleanup, rc);
309         rc = dt_trans_start_local(env, tgt->lut_bottom, th);
310         if (rc)
311                 GOTO(cleanup, rc);
312
313         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
314         if (rc == 0)
315                 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
316
317 cleanup:
318         dt_trans_stop(env, tgt->lut_bottom, th);
319
320         RETURN(rc);
321 }
322 EXPORT_SYMBOL(tgt_truncate_last_rcvd);
323
324 void tgt_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
325 {
326         struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
327         struct lu_target        *tgt = class_exp2tgt(exp);
328
329         LASSERT(tgt->lut_bottom);
330         /** VBR: set client last_epoch to current epoch */
331         if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
332                 return;
333         lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
334         tgt_client_data_update(env, exp);
335 }
336
337 /**
338  * Update boot epoch when recovery ends
339  */
340 void tgt_boot_epoch_update(struct lu_target *tgt)
341 {
342         struct lu_env            env;
343         struct ptlrpc_request   *req;
344         __u32                    start_epoch;
345         cfs_list_t               client_list;
346         int                      rc;
347
348         if (tgt->lut_obd->obd_stopping)
349                 return;
350
351         rc = lu_env_init(&env, LCT_DT_THREAD);
352         if (rc) {
353                 CERROR("%s: can't initialize environment: rc = %d\n",
354                         tgt->lut_obd->obd_name, rc);
355                 return;
356         }
357
358         spin_lock(&tgt->lut_translock);
359         start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
360         tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
361         tgt->lut_lsd.lsd_start_epoch = start_epoch;
362         spin_unlock(&tgt->lut_translock);
363
364         CFS_INIT_LIST_HEAD(&client_list);
365         /**
366          * The recovery is not yet finished and final queue can still be updated
367          * with resend requests. Move final list to separate one for processing
368          */
369         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
370         cfs_list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
371         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
372
373         /**
374          * go through list of exports participated in recovery and
375          * set new epoch for them
376          */
377         cfs_list_for_each_entry(req, &client_list, rq_list) {
378                 LASSERT(!req->rq_export->exp_delayed);
379                 if (!req->rq_export->exp_vbr_failed)
380                         tgt_client_epoch_update(&env, req->rq_export);
381         }
382         /** return list back at once */
383         spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
384         cfs_list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
385         spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
386         /** update server epoch */
387         tgt_server_data_update(&env, tgt, 1);
388         lu_env_fini(&env);
389 }
390 EXPORT_SYMBOL(tgt_boot_epoch_update);
391
392 /**
393  * commit callback, need to update last_commited value
394  */
395 struct tgt_last_committed_callback {
396         struct dt_txn_commit_cb  llcc_cb;
397         struct lu_target        *llcc_tgt;
398         struct obd_export       *llcc_exp;
399         __u64                    llcc_transno;
400 };
401
402 void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
403                            struct dt_txn_commit_cb *cb, int err)
404 {
405         struct tgt_last_committed_callback *ccb;
406
407         ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
408
409         LASSERT(ccb->llcc_tgt != NULL);
410         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
411
412         spin_lock(&ccb->llcc_tgt->lut_translock);
413         if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
414                 ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
415
416         LASSERT(ccb->llcc_exp);
417         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
418                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
419                 spin_unlock(&ccb->llcc_tgt->lut_translock);
420                 ptlrpc_commit_replies(ccb->llcc_exp);
421         } else {
422                 spin_unlock(&ccb->llcc_tgt->lut_translock);
423         }
424         class_export_cb_put(ccb->llcc_exp);
425         if (ccb->llcc_transno)
426                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
427                        ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
428         OBD_FREE_PTR(ccb);
429 }
430
431 int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
432                            struct obd_export *exp, __u64 transno)
433 {
434         struct tgt_last_committed_callback      *ccb;
435         struct dt_txn_commit_cb                 *dcb;
436         int                                      rc;
437
438         OBD_ALLOC_PTR(ccb);
439         if (ccb == NULL)
440                 return -ENOMEM;
441
442         ccb->llcc_tgt = tgt;
443         ccb->llcc_exp = class_export_cb_get(exp);
444         ccb->llcc_transno = transno;
445
446         dcb = &ccb->llcc_cb;
447         dcb->dcb_func = tgt_cb_last_committed;
448         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
449         strncpy(dcb->dcb_name, "tgt_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
450         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
451
452         rc = dt_trans_cb_add(th, dcb);
453         if (rc) {
454                 class_export_cb_put(exp);
455                 OBD_FREE_PTR(ccb);
456         }
457
458         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
459                 /* report failure to force synchronous operation */
460                 return -EPERM;
461
462         return rc;
463 }
464 EXPORT_SYMBOL(tgt_last_commit_cb_add);
465
466 struct tgt_new_client_callback {
467         struct dt_txn_commit_cb  lncc_cb;
468         struct obd_export       *lncc_exp;
469 };
470
471 void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
472                        struct dt_txn_commit_cb *cb, int err)
473 {
474         struct tgt_new_client_callback *ccb;
475
476         ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
477
478         LASSERT(ccb->lncc_exp->exp_obd);
479
480         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
481                ccb->lncc_exp->exp_obd->obd_name,
482                ccb->lncc_exp->exp_client_uuid.uuid);
483
484         spin_lock(&ccb->lncc_exp->exp_lock);
485         ccb->lncc_exp->exp_need_sync = 0;
486         spin_unlock(&ccb->lncc_exp->exp_lock);
487         class_export_cb_put(ccb->lncc_exp);
488
489         OBD_FREE_PTR(ccb);
490 }
491
492 int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
493 {
494         struct tgt_new_client_callback  *ccb;
495         struct dt_txn_commit_cb         *dcb;
496         int                              rc;
497
498         OBD_ALLOC_PTR(ccb);
499         if (ccb == NULL)
500                 return -ENOMEM;
501
502         ccb->lncc_exp = class_export_cb_get(exp);
503
504         dcb = &ccb->lncc_cb;
505         dcb->dcb_func = tgt_cb_new_client;
506         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
507         strncpy(dcb->dcb_name, "tgt_cb_new_client", MAX_COMMIT_CB_STR_LEN);
508         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
509
510         rc = dt_trans_cb_add(th, dcb);
511         if (rc) {
512                 class_export_cb_put(exp);
513                 OBD_FREE_PTR(ccb);
514         }
515         return rc;
516 }
517
518 /**
519  * Add new client to the last_rcvd upon new connection.
520  *
521  * We use a bitmap to locate a free space in the last_rcvd file and initialize
522  * tg_export_data.
523  */
524 int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
525 {
526         struct tg_export_data   *ted = &exp->exp_target_data;
527         struct lu_target        *tgt = class_exp2tgt(exp);
528         int                      rc = 0, idx;
529
530         ENTRY;
531
532         LASSERT(tgt->lut_client_bitmap != NULL);
533         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
534                 RETURN(0);
535
536         mutex_init(&ted->ted_lcd_lock);
537
538         if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
539                 RETURN(0);
540
541         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
542          * there's no need for extra complication here
543          */
544         idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
545 repeat:
546         if (idx >= LR_MAX_CLIENTS ||
547             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
548                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
549                        tgt->lut_obd->obd_name,  idx);
550                 RETURN(-EOVERFLOW);
551         }
552         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
553                 idx = find_next_zero_bit(tgt->lut_client_bitmap,
554                                              LR_MAX_CLIENTS, idx);
555                 goto repeat;
556         }
557
558         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
559                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
560
561         ted->ted_lr_idx = idx;
562         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
563                           idx * tgt->lut_lsd.lsd_client_size;
564
565         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
566
567         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
568                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
569                ted->ted_lcd->lcd_uuid);
570
571         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
572                 RETURN(-ENOSPC);
573
574         rc = tgt_client_data_update(env, exp);
575         if (rc)
576                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
577                        tgt->lut_obd->obd_name, idx, rc);
578
579         RETURN(rc);
580 }
581 EXPORT_SYMBOL(tgt_client_new);
582
583 /* Add client data to the MDS.  We use a bitmap to locate a free space
584  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
585  * Otherwise, we just have to read the data from the last_rcvd file and
586  * we know its offset.
587  *
588  * It should not be possible to fail adding an existing client - otherwise
589  * mdt_init_server_data() callsite needs to be fixed.
590  */
591 int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
592 {
593         struct tg_export_data   *ted = &exp->exp_target_data;
594         struct lu_target        *tgt = class_exp2tgt(exp);
595
596         ENTRY;
597
598         LASSERT(tgt->lut_client_bitmap != NULL);
599         LASSERTF(idx >= 0, "%d\n", idx);
600
601         if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
602             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
603                 RETURN(0);
604
605         if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
606                 CERROR("%s: client %d: bit already set in bitmap!!\n",
607                        tgt->lut_obd->obd_name,  idx);
608                 LBUG();
609         }
610
611         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
612                tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
613
614         ted->ted_lr_idx = idx;
615         ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
616                           idx * tgt->lut_lsd.lsd_client_size;
617
618         mutex_init(&ted->ted_lcd_lock);
619
620         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
621
622         RETURN(0);
623 }
624 EXPORT_SYMBOL(tgt_client_add);
625
626 int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
627 {
628         struct tg_export_data   *ted = &exp->exp_target_data;
629         struct lu_target        *tgt = class_exp2tgt(exp);
630         int                      rc;
631
632         ENTRY;
633
634         LASSERT(ted->ted_lcd);
635
636         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
637         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
638                     (char *)tgt->lut_obd->obd_uuid.uuid) ||
639             exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
640                 RETURN(0);
641
642         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
643                tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
644                ted->ted_lcd->lcd_uuid);
645
646         /* Clear the bit _after_ zeroing out the client so we don't
647            race with filter_client_add and zero out new clients.*/
648         if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
649                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
650                        tgt->lut_obd->obd_name, ted->ted_lr_idx);
651                 LBUG();
652         }
653
654         /* Do not erase record for recoverable client. */
655         if (exp->exp_flags & OBD_OPT_FAILOVER)
656                 RETURN(0);
657
658         /* Make sure the server's last_transno is up to date.
659          * This should be done before zeroing client slot so last_transno will
660          * be in server data or in client data in case of failure */
661         rc = tgt_server_data_update(env, tgt, 0);
662         if (rc != 0) {
663                 CERROR("%s: failed to update server data, skip client %s "
664                        "zeroing, rc %d\n", tgt->lut_obd->obd_name,
665                        ted->ted_lcd->lcd_uuid, rc);
666                 RETURN(rc);
667         }
668
669         mutex_lock(&ted->ted_lcd_lock);
670         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
671         rc = tgt_client_data_update(env, exp);
672         mutex_unlock(&ted->ted_lcd_lock);
673
674         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
675                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
676                tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
677                ted->ted_lr_idx, ted->ted_lr_off, rc);
678         RETURN(rc);
679 }
680 EXPORT_SYMBOL(tgt_client_del);