Whamcloud - gitweb
LU-56 ptlrpc: svc thread starting/stopping cleanup
[fs/lustre-release.git] / lustre / ptlrpc / target.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Common Target
37  * These are common function for MDT and OST recovery-related functionality
38  *
39  *   Author: Mikhail Pershin <tappro@sun.com>
40  */
41
42 #include <obd.h>
43 #include <lustre_fsfilt.h>
44 #include <obd_class.h>
45 #include <lustre_fid.h>
46
47 /**
48  * Common data shared by tg-level handlers. This is allocated per-thread to
49  * reduce stack consumption.
50  */
51 struct tg_thread_info {
52         /* server and client data buffers */
53         struct lr_server_data  tti_lsd;
54         struct lsd_client_data tti_lcd;
55         struct lu_buf          tti_buf;
56         loff_t                 tti_off;
57 };
58
59 static inline struct lu_buf *tti_buf_lsd(struct tg_thread_info *tti)
60 {
61         tti->tti_buf.lb_buf = &tti->tti_lsd;
62         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
63         return &tti->tti_buf;
64 }
65
66 static inline struct lu_buf *tti_buf_lcd(struct tg_thread_info *tti)
67 {
68         tti->tti_buf.lb_buf = &tti->tti_lcd;
69         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
70         return &tti->tti_buf;
71 }
72
73 extern struct lu_context_key tg_thread_key;
74
75 static inline struct tg_thread_info *tg_th_info(const struct lu_env *env)
76 {
77         struct tg_thread_info *tti;
78
79         tti = lu_context_key_get(&env->le_ctx, &tg_thread_key);
80         LASSERT(tti);
81         return tti;
82 }
83
84 /**
85  * Update client data in last_rcvd file. An obd API
86  */
87 static int obt_client_data_update(struct obd_export *exp)
88 {
89         struct tg_export_data *ted = &exp->exp_target_data;
90         struct obd_device_target *obt = &exp->exp_obd->u.obt;
91         struct lu_target *lut = class_exp2tgt(exp);
92         loff_t off = ted->ted_lr_off;
93         int rc = 0;
94
95         rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
96                                  ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
97
98         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
99                ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
100                le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
101
102         return rc;
103 }
104
105 /**
106  * Update server data in last_rcvd file. An obd API
107  */
108 int obt_server_data_update(struct lu_target *lut, int force_sync)
109 {
110         struct obd_device_target *obt = &lut->lut_obd->u.obt;
111         loff_t off = 0;
112         int rc;
113         ENTRY;
114
115         CDEBUG(D_SUPER,
116                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
117                lut->lut_lsd.lsd_uuid,
118                le64_to_cpu(lut->lut_lsd.lsd_mount_count),
119                le64_to_cpu(lut->lut_lsd.lsd_last_transno));
120
121         rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
122                                  &lut->lut_lsd, sizeof(lut->lut_lsd),
123                                  &off, force_sync);
124         if (rc)
125                 CERROR("error writing lr_server_data: rc = %d\n", rc);
126
127         RETURN(rc);
128 }
129
130 /**
131  * Update client epoch with server's one
132  */
133 void obt_client_epoch_update(struct obd_export *exp)
134 {
135         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
136         struct lu_target *lut = class_exp2tgt(exp);
137
138         /** VBR: set client last_epoch to current epoch */
139         if (le32_to_cpu(lcd->lcd_last_epoch) >=
140             le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
141                 return;
142         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
143         obt_client_data_update(exp);
144 }
145
146 /**
147  * Increment server epoch. An obd API
148  */
149 static void obt_boot_epoch_update(struct lu_target *lut)
150 {
151         struct obd_device *obd = lut->lut_obd;
152         __u32 start_epoch;
153         struct ptlrpc_request *req;
154         cfs_list_t client_list;
155
156         cfs_spin_lock(&lut->lut_translock);
157         start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
158         lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
159                                             LR_EPOCH_BITS);
160         lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
161         cfs_spin_unlock(&lut->lut_translock);
162
163         CFS_INIT_LIST_HEAD(&client_list);
164         cfs_spin_lock(&obd->obd_recovery_task_lock);
165         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
166         cfs_spin_unlock(&obd->obd_recovery_task_lock);
167
168         /**
169          * go through list of exports participated in recovery and
170          * set new epoch for them
171          */
172         cfs_list_for_each_entry(req, &client_list, rq_list) {
173                 LASSERT(!req->rq_export->exp_delayed);
174                 obt_client_epoch_update(req->rq_export);
175         }
176         /** return list back at once */
177         cfs_spin_lock(&obd->obd_recovery_task_lock);
178         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
179         cfs_spin_unlock(&obd->obd_recovery_task_lock);
180         obt_server_data_update(lut, 1);
181 }
182
183 /**
184  * Allocate in-memory data for client slot related to export.
185  */
186 int lut_client_alloc(struct obd_export *exp)
187 {
188         LASSERT(exp != exp->exp_obd->obd_self_export);
189
190         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
191         if (exp->exp_target_data.ted_lcd == NULL)
192                 RETURN(-ENOMEM);
193         /* Mark that slot is not yet valid, 0 doesn't work here */
194         exp->exp_target_data.ted_lr_idx = -1;
195         RETURN(0);
196 }
197 EXPORT_SYMBOL(lut_client_alloc);
198
199 /**
200  * Free in-memory data for client slot related to export.
201  */
202 void lut_client_free(struct obd_export *exp)
203 {
204         struct tg_export_data *ted = &exp->exp_target_data;
205         struct lu_target *lut = class_exp2tgt(exp);
206
207         LASSERT(exp != exp->exp_obd->obd_self_export);
208
209         OBD_FREE_PTR(ted->ted_lcd);
210         ted->ted_lcd = NULL;
211
212         /* Slot may be not yet assigned */
213         if (ted->ted_lr_idx < 0)
214                 return;
215         /* Clear bit when lcd is freed */
216         LASSERT(lut->lut_client_bitmap);
217         if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
218                 CERROR("%s: client %u bit already clear in bitmap\n",
219                        exp->exp_obd->obd_name, ted->ted_lr_idx);
220                 LBUG();
221         }
222 }
223 EXPORT_SYMBOL(lut_client_free);
224
225 int lut_client_data_read(const struct lu_env *env, struct lu_target *tg,
226                          struct lsd_client_data *lcd, loff_t *off, int index)
227 {
228         struct tg_thread_info *tti = tg_th_info(env);
229         int rc;
230
231         tti_buf_lcd(tti);
232         rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, off);
233         if (rc == 0) {
234                 check_lcd(tg->lut_obd->obd_name, index, &tti->tti_lcd);
235                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
236         }
237
238         CDEBUG(D_INFO, "read lcd @%lld rc = %d, uuid = %s, last_transno = "LPU64
239                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
240                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
241                "last_close_result = %u\n", *off,
242                rc, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
243                lcd->lcd_last_result, lcd->lcd_last_data,
244                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
245                lcd->lcd_last_close_result);
246         return rc;
247 }
248 EXPORT_SYMBOL(lut_client_data_read);
249
250 int lut_client_data_write(const struct lu_env *env, struct lu_target *tg,
251                           struct lsd_client_data *lcd, loff_t *off,
252                           struct thandle *th)
253 {
254         struct tg_thread_info *tti = tg_th_info(env);
255
256         lcd_cpu_to_le(lcd, &tti->tti_lcd);
257         tti_buf_lcd(tti);
258
259         return dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, off, th);
260 }
261 EXPORT_SYMBOL(lut_client_data_write);
262
263 /**
264  * Update client data in last_rcvd
265  */
266 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
267 {
268         struct tg_export_data *ted = &exp->exp_target_data;
269         struct lu_target      *tg = class_exp2tgt(exp);
270         struct tg_thread_info *tti = tg_th_info(env);
271         struct thandle        *th;
272         int                    rc = 0;
273
274         ENTRY;
275
276         th = dt_trans_create(env, tg->lut_bottom);
277         if (IS_ERR(th))
278                 RETURN(PTR_ERR(th));
279
280         rc = dt_declare_record_write(env, tg->lut_last_rcvd,
281                                      sizeof(struct lsd_client_data),
282                                      ted->ted_lr_off, th);
283         if (rc)
284                 GOTO(out, rc);
285
286         rc = dt_trans_start_local(env, tg->lut_bottom, th);
287         if (rc)
288                 GOTO(out, rc);
289         /*
290          * Until this operations will be committed the sync is needed
291          * for this export. This should be done _after_ starting the
292          * transaction so that many connecting clients will not bring
293          * server down with lots of sync writes.
294          */
295         rc = lut_new_client_cb_add(th, exp);
296         if (rc) {
297                 /* can't add callback, do sync now */
298                 th->th_sync = 1;
299         } else {
300                 cfs_spin_lock(&exp->exp_lock);
301                 exp->exp_need_sync = 1;
302                 cfs_spin_unlock(&exp->exp_lock);
303         }
304
305         tti->tti_off = ted->ted_lr_off;
306         rc = lut_client_data_write(env, tg, ted->ted_lcd, &tti->tti_off, th);
307         EXIT;
308 out:
309         dt_trans_stop(env, tg->lut_bottom, th);
310         CDEBUG(D_INFO, "write last_rcvd, rc = %d:\n"
311                "uuid = %s\nlast_transno = "LPU64"\n",
312                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
313
314         return rc;
315 }
316
317 int lut_server_data_read(const struct lu_env *env, struct lu_target *tg)
318 {
319         struct tg_thread_info *tti = tg_th_info(env);
320         int rc;
321
322         tti->tti_off = 0;
323         tti_buf_lsd(tti);
324         rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, &tti->tti_off);
325         if (rc == 0)
326                 lsd_le_to_cpu(&tti->tti_lsd, &tg->lut_lsd);
327
328         CDEBUG(D_INFO, "%s: read last_rcvd header, rc = %d, uuid = %s, "
329                "last_transno = "LPU64"\n", tg->lut_obd->obd_name, rc,
330                tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
331         return rc;
332 }
333 EXPORT_SYMBOL(lut_server_data_read);
334
335 int lut_server_data_write(const struct lu_env *env, struct lu_target *tg,
336                           struct thandle *th)
337 {
338         struct tg_thread_info *tti = tg_th_info(env);
339         int rc;
340         ENTRY;
341
342         tti->tti_off = 0;
343         tti_buf_lsd(tti);
344         lsd_cpu_to_le(&tg->lut_lsd, &tti->tti_lsd);
345
346         rc = dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf,
347                              &tti->tti_off, th);
348
349         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
350                "uuid = %s\nlast_transno = "LPU64"\n",
351                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
352
353         RETURN(rc);
354 }
355 EXPORT_SYMBOL(lut_server_data_write);
356
357 /**
358  * Update server data in last_rcvd
359  */
360 int lut_server_data_update(const struct lu_env *env, struct lu_target *tg,
361                            int sync)
362 {
363         struct tg_thread_info *tti = tg_th_info(env);
364         struct thandle  *th;
365         int                 rc = 0;
366
367         ENTRY;
368
369         CDEBUG(D_SUPER,
370                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
371                tg->lut_lsd.lsd_uuid, tg->lut_obd->u.obt.obt_mount_count,
372                tg->lut_last_transno);
373
374         /* Always save latest transno to keep it fresh */
375         cfs_spin_lock(&tg->lut_translock);
376         tg->lut_lsd.lsd_last_transno = tg->lut_last_transno;
377         cfs_spin_unlock(&tg->lut_translock);
378
379         th = dt_trans_create(env, tg->lut_bottom);
380         if (IS_ERR(th))
381                 RETURN(PTR_ERR(th));
382
383         th->th_sync = sync;
384
385         rc = dt_declare_record_write(env, tg->lut_last_rcvd,
386                                      sizeof(struct lr_server_data),
387                                      tti->tti_off, th);
388         if (rc)
389                 GOTO(out, rc);
390
391         rc = dt_trans_start(env, tg->lut_bottom, th);
392         if (rc)
393                 GOTO(out, rc);
394
395         rc = lut_server_data_write(env, tg, th);
396 out:
397         dt_trans_stop(env, tg->lut_bottom, th);
398
399         CDEBUG(D_INFO, "write last_rcvd header, rc = %d:\n"
400                "uuid = %s\nlast_transno = "LPU64"\n",
401                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
402         RETURN(rc);
403 }
404 EXPORT_SYMBOL(lut_server_data_update);
405
406 int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
407                            loff_t size)
408 {
409         struct dt_object *dt = tg->lut_last_rcvd;
410         struct thandle   *th;
411         struct lu_attr    attr;
412         int               rc;
413
414         ENTRY;
415
416         attr.la_size = size;
417         attr.la_valid = LA_SIZE;
418
419         th = dt_trans_create(env, tg->lut_bottom);
420         if (IS_ERR(th))
421                 RETURN(PTR_ERR(th));
422         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
423         if (rc)
424                 GOTO(cleanup, rc);
425         rc = dt_declare_attr_set(env, dt, &attr, th);
426         if (rc)
427                 GOTO(cleanup, rc);
428         rc = dt_trans_start_local(env, tg->lut_bottom, th);
429         if (rc)
430                 GOTO(cleanup, rc);
431
432         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
433         if (rc == 0)
434                 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
435
436 cleanup:
437         dt_trans_stop(env, tg->lut_bottom, th);
438
439         RETURN(rc);
440 }
441 EXPORT_SYMBOL(lut_truncate_last_rcvd);
442
443 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
444 {
445         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
446         struct lu_target *lut = class_exp2tgt(exp);
447
448         LASSERT(lut->lut_bottom);
449         /** VBR: set client last_epoch to current epoch */
450         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
451                 return;
452         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
453         lut_client_data_update(env, exp);
454 }
455
456 /**
457  * Update boot epoch when recovery ends
458  */
459 void lut_boot_epoch_update(struct lu_target *lut)
460 {
461         struct lu_env env;
462         struct ptlrpc_request *req;
463         __u32 start_epoch;
464         cfs_list_t client_list;
465         int rc;
466
467         if (lut->lut_obd->obd_stopping)
468                 return;
469         /** Increase server epoch after recovery */
470         if (lut->lut_bottom == NULL)
471                 return obt_boot_epoch_update(lut);
472
473         rc = lu_env_init(&env, LCT_DT_THREAD);
474         if (rc) {
475                 CERROR("Can't initialize environment rc=%d\n", rc);
476                 return;
477         }
478
479         cfs_spin_lock(&lut->lut_translock);
480         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
481         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
482         lut->lut_lsd.lsd_start_epoch = start_epoch;
483         cfs_spin_unlock(&lut->lut_translock);
484
485         CFS_INIT_LIST_HEAD(&client_list);
486         /**
487          * The recovery is not yet finished and final queue can still be updated
488          * with resend requests. Move final list to separate one for processing
489          */
490         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
491         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
492         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
493
494         /**
495          * go through list of exports participated in recovery and
496          * set new epoch for them
497          */
498         cfs_list_for_each_entry(req, &client_list, rq_list) {
499                 LASSERT(!req->rq_export->exp_delayed);
500                 if (!req->rq_export->exp_vbr_failed)
501                         lut_client_epoch_update(&env, req->rq_export);
502         }
503         /** return list back at once */
504         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
505         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
506         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
507         /** update server epoch */
508         lut_server_data_update(&env, lut, 1);
509         lu_env_fini(&env);
510 }
511 EXPORT_SYMBOL(lut_boot_epoch_update);
512
513 /**
514  * commit callback, need to update last_commited value
515  */
516 struct lut_last_committed_callback {
517         struct dt_txn_commit_cb llcc_cb;
518         struct lu_target       *llcc_lut;
519         struct obd_export      *llcc_exp;
520         __u64                   llcc_transno;
521 };
522
523 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
524                            struct dt_txn_commit_cb *cb, int err)
525 {
526         struct lut_last_committed_callback *ccb;
527
528         ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
529
530         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
531
532         cfs_spin_lock(&ccb->llcc_lut->lut_translock);
533         if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
534                 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
535
536         LASSERT(ccb->llcc_exp);
537         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
538                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
539                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
540                 ptlrpc_commit_replies(ccb->llcc_exp);
541         } else {
542                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
543         }
544         class_export_cb_put(ccb->llcc_exp);
545         if (ccb->llcc_transno)
546                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
547                        ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
548         OBD_FREE_PTR(ccb);
549 }
550
551 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
552                            struct obd_export *exp, __u64 transno)
553 {
554         struct lut_last_committed_callback *ccb;
555         int rc;
556
557         OBD_ALLOC_PTR(ccb);
558         if (ccb == NULL)
559                 return -ENOMEM;
560
561         ccb->llcc_cb.dcb_func = lut_cb_last_committed;
562         CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
563         ccb->llcc_lut = lut;
564         ccb->llcc_exp = class_export_cb_get(exp);
565         ccb->llcc_transno = transno;
566
567         rc = dt_trans_cb_add(th, &ccb->llcc_cb);
568         if (rc) {
569                 class_export_cb_put(exp);
570                 OBD_FREE_PTR(ccb);
571         }
572         return rc;
573 }
574 EXPORT_SYMBOL(lut_last_commit_cb_add);
575
576 struct lut_new_client_callback {
577         struct dt_txn_commit_cb lncc_cb;
578         struct obd_export      *lncc_exp;
579 };
580
581 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
582                        struct dt_txn_commit_cb *cb, int err)
583 {
584         struct lut_new_client_callback *ccb;
585
586         ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
587
588         LASSERT(ccb->lncc_exp->exp_obd);
589
590         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
591                ccb->lncc_exp->exp_obd->obd_name,
592                ccb->lncc_exp->exp_client_uuid.uuid);
593
594         cfs_spin_lock(&ccb->lncc_exp->exp_lock);
595         ccb->lncc_exp->exp_need_sync = 0;
596         cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
597         class_export_cb_put(ccb->lncc_exp);
598
599         OBD_FREE_PTR(ccb);
600 }
601
602 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
603 {
604         struct lut_new_client_callback *ccb;
605         int rc;
606
607         OBD_ALLOC_PTR(ccb);
608         if (ccb == NULL)
609                 return -ENOMEM;
610
611         ccb->lncc_cb.dcb_func = lut_cb_new_client;
612         CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
613         ccb->lncc_exp = class_export_cb_get(exp);
614
615         rc = dt_trans_cb_add(th, &ccb->lncc_cb);
616         if (rc) {
617                 class_export_cb_put(exp);
618                 OBD_FREE_PTR(ccb);
619         }
620         return rc;
621 }
622
623 /**
624  * Add new client to the last_rcvd upon new connection.
625  *
626  * We use a bitmap to locate a free space in the last_rcvd file and initialize
627  * tg_export_data.
628  */
629 int lut_client_new(const struct lu_env *env, struct obd_export *exp)
630 {
631         struct tg_export_data *ted = &exp->exp_target_data;
632         struct lu_target *tg = class_exp2tgt(exp);
633         int rc = 0, idx;
634
635         ENTRY;
636
637         LASSERT(tg->lut_client_bitmap != NULL);
638         if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
639                 RETURN(0);
640
641         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
642          * there's no need for extra complication here
643          */
644         idx = cfs_find_first_zero_bit(tg->lut_client_bitmap, LR_MAX_CLIENTS);
645 repeat:
646         if (idx >= LR_MAX_CLIENTS ||
647             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
648                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
649                        tg->lut_obd->obd_name,  idx);
650                 RETURN(-EOVERFLOW);
651         }
652         if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
653                 idx = cfs_find_next_zero_bit(tg->lut_client_bitmap,
654                                              LR_MAX_CLIENTS, idx);
655                 goto repeat;
656         }
657
658         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
659                tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
660
661         ted->ted_lr_idx = idx;
662         ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
663                           idx * tg->lut_lsd.lsd_client_size;
664
665         cfs_mutex_init(&ted->ted_lcd_lock);
666
667         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
668
669         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
670                tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
671                ted->ted_lcd->lcd_uuid);
672
673         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
674                 RETURN(-ENOSPC);
675
676         rc = lut_client_data_update(env, exp);
677         if (rc)
678                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
679                        tg->lut_obd->obd_name, idx, rc);
680
681         RETURN(rc);
682 }
683 EXPORT_SYMBOL(lut_client_new);
684
685 /* Add client data to the MDS.  We use a bitmap to locate a free space
686  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
687  * Otherwise, we just have to read the data from the last_rcvd file and
688  * we know its offset.
689  *
690  * It should not be possible to fail adding an existing client - otherwise
691  * mdt_init_server_data() callsite needs to be fixed.
692  */
693 int lut_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
694 {
695         struct tg_export_data *ted = &exp->exp_target_data;
696         struct lu_target *tg = class_exp2tgt(exp);
697
698         ENTRY;
699
700         LASSERT(tg->lut_client_bitmap != NULL);
701         LASSERTF(idx >= 0, "%d\n", idx);
702
703         if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
704                 RETURN(0);
705
706         if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
707                 CERROR("%s: client %d: bit already set in bitmap!!\n",
708                        tg->lut_obd->obd_name,  idx);
709                 LBUG();
710         }
711
712         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
713                tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
714
715         ted->ted_lr_idx = idx;
716         ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
717                           idx * tg->lut_lsd.lsd_client_size;
718
719         cfs_mutex_init(&ted->ted_lcd_lock);
720
721         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
722
723         RETURN(0);
724 }
725 EXPORT_SYMBOL(lut_client_add);
726
727 int lut_client_del(const struct lu_env *env, struct obd_export *exp)
728 {
729         struct tg_export_data *ted = &exp->exp_target_data;
730         struct lu_target      *tg = class_exp2tgt(exp);
731         int                    rc;
732
733         ENTRY;
734
735         LASSERT(ted->ted_lcd);
736
737         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
738         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
739                     (char *)tg->lut_obd->obd_uuid.uuid))
740                 RETURN(0);
741
742         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
743                tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
744                ted->ted_lcd->lcd_uuid);
745
746         /* Clear the bit _after_ zeroing out the client so we don't
747            race with filter_client_add and zero out new clients.*/
748         if (!cfs_test_bit(ted->ted_lr_idx, tg->lut_client_bitmap)) {
749                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
750                        tg->lut_obd->obd_name, ted->ted_lr_idx);
751                 LBUG();
752         }
753
754         /* Do not erase record for recoverable client. */
755         if (exp->exp_flags & OBD_OPT_FAILOVER)
756                 RETURN(0);
757
758         /* Make sure the server's last_transno is up to date.
759          * This should be done before zeroing client slot so last_transno will
760          * be in server data or in client data in case of failure */
761         rc = lut_server_data_update(env, tg, 0);
762         if (rc != 0) {
763                 CERROR("%s: failed to update server data, skip client %s "
764                        "zeroing, rc %d\n", tg->lut_obd->obd_name,
765                        ted->ted_lcd->lcd_uuid, rc);
766                 RETURN(rc);
767         }
768
769         cfs_mutex_lock(&ted->ted_lcd_lock);
770         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
771         rc = lut_client_data_update(env, exp);
772         cfs_mutex_unlock(&ted->ted_lcd_lock);
773
774         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
775                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
776                tg->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
777                ted->ted_lr_idx, ted->ted_lr_off, rc);
778         RETURN(rc);
779 }
780 EXPORT_SYMBOL(lut_client_del);
781
782 int lut_init(const struct lu_env *env, struct lu_target *lut,
783              struct obd_device *obd, struct dt_device *dt)
784 {
785         struct dt_object_format dof;
786         struct lu_attr          attr;
787         struct lu_fid           fid;
788         struct dt_object       *o;
789         int                     rc = 0;
790         ENTRY;
791
792         LASSERT(lut);
793         LASSERT(obd);
794         lut->lut_obd = obd;
795         lut->lut_bottom = dt;
796         lut->lut_last_rcvd = NULL;
797         obd->u.obt.obt_lut = lut;
798
799         cfs_spin_lock_init(&lut->lut_translock);
800
801         OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
802         if (lut->lut_client_bitmap == NULL)
803                 RETURN(-ENOMEM);
804
805         /** obdfilter has no lu_device stack yet */
806         if (dt == NULL)
807                 RETURN(rc);
808
809         memset(&attr, 0, sizeof(attr));
810         attr.la_valid = LA_MODE;
811         attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
812         dof.dof_type = dt_mode_to_dft(S_IFREG);
813
814         lu_local_obj_fid(&fid, MDT_LAST_RECV_OID);
815
816         o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
817         if (!IS_ERR(o)) {
818                 lut->lut_last_rcvd = o;
819         } else {
820                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
821                 lut->lut_client_bitmap = NULL;
822                 rc = PTR_ERR(o);
823                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
824         }
825
826         RETURN(rc);
827 }
828 EXPORT_SYMBOL(lut_init);
829
830 void lut_fini(const struct lu_env *env, struct lu_target *lut)
831 {
832         ENTRY;
833
834         if (lut->lut_client_bitmap) {
835                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
836                 lut->lut_client_bitmap = NULL;
837         }
838         if (lut->lut_last_rcvd) {
839                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
840                 lut->lut_last_rcvd = NULL;
841         }
842         EXIT;
843 }
844 EXPORT_SYMBOL(lut_fini);
845
846 /* context key constructor/destructor: tg_key_init, tg_key_fini */
847 LU_KEY_INIT_FINI(tg, struct tg_thread_info);
848 /* context key: tg_thread_key */
849 LU_CONTEXT_KEY_DEFINE(tg, LCT_MD_THREAD|LCT_DT_THREAD);
850 LU_KEY_INIT_GENERIC(tg);
851 EXPORT_SYMBOL(tg_thread_key);
852
853 int lut_mod_init(void)
854 {
855         tg_key_init_generic(&tg_thread_key, NULL);
856         lu_context_key_register_many(&tg_thread_key, NULL);
857         return 0;
858 }
859
860 void lut_mod_exit(void)
861 {
862         lu_context_key_degister_many(&tg_thread_key, NULL);
863 }
864