Whamcloud - gitweb
LU-1303 lod: introduce lod device
[fs/lustre-release.git] / lustre / ptlrpc / target.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Lustre Common Target
37  * These are common function for MDT and OST recovery-related functionality
38  *
39  *   Author: Mikhail Pershin <tappro@sun.com>
40  */
41
42 #include <obd.h>
43 #include <lustre_fsfilt.h>
44 #include <obd_class.h>
45 #include <lustre_fid.h>
46
47 /**
48  * Common data shared by tg-level handlers. This is allocated per-thread to
49  * reduce stack consumption.
50  */
51 struct tg_thread_info {
52         /* server and client data buffers */
53         struct lr_server_data  tti_lsd;
54         struct lsd_client_data tti_lcd;
55         struct lu_buf          tti_buf;
56         loff_t                 tti_off;
57 };
58
59 static inline struct lu_buf *tti_buf_lsd(struct tg_thread_info *tti)
60 {
61         tti->tti_buf.lb_buf = &tti->tti_lsd;
62         tti->tti_buf.lb_len = sizeof(tti->tti_lsd);
63         return &tti->tti_buf;
64 }
65
66 static inline struct lu_buf *tti_buf_lcd(struct tg_thread_info *tti)
67 {
68         tti->tti_buf.lb_buf = &tti->tti_lcd;
69         tti->tti_buf.lb_len = sizeof(tti->tti_lcd);
70         return &tti->tti_buf;
71 }
72
73 extern struct lu_context_key tg_thread_key;
74
75 static inline struct tg_thread_info *tg_th_info(const struct lu_env *env)
76 {
77         struct tg_thread_info *tti;
78
79         tti = lu_context_key_get(&env->le_ctx, &tg_thread_key);
80         LASSERT(tti);
81         return tti;
82 }
83
84 /**
85  * Update client data in last_rcvd file. An obd API
86  */
87 static int obt_client_data_update(struct obd_export *exp)
88 {
89         struct tg_export_data *ted = &exp->exp_target_data;
90         struct obd_device_target *obt = &exp->exp_obd->u.obt;
91         struct lu_target *lut = class_exp2tgt(exp);
92         loff_t off = ted->ted_lr_off;
93         int rc = 0;
94
95         rc = fsfilt_write_record(exp->exp_obd, obt->obt_rcvd_filp,
96                                  ted->ted_lcd, sizeof(*ted->ted_lcd), &off, 0);
97
98         CDEBUG(D_INFO, "update client idx %u last_epoch %#x (%#x)\n",
99                ted->ted_lr_idx, le32_to_cpu(ted->ted_lcd->lcd_last_epoch),
100                le32_to_cpu(lut->lut_lsd.lsd_start_epoch));
101
102         return rc;
103 }
104
105 /**
106  * Update server data in last_rcvd file. An obd API
107  */
108 int obt_server_data_update(struct lu_target *lut, int force_sync)
109 {
110         struct obd_device_target *obt = &lut->lut_obd->u.obt;
111         loff_t off = 0;
112         int rc;
113         ENTRY;
114
115         CDEBUG(D_SUPER,
116                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
117                lut->lut_lsd.lsd_uuid,
118                le64_to_cpu(lut->lut_lsd.lsd_mount_count),
119                le64_to_cpu(lut->lut_lsd.lsd_last_transno));
120
121         rc = fsfilt_write_record(lut->lut_obd, obt->obt_rcvd_filp,
122                                  &lut->lut_lsd, sizeof(lut->lut_lsd),
123                                  &off, force_sync);
124         if (rc)
125                 CERROR("error writing lr_server_data: rc = %d\n", rc);
126
127         RETURN(rc);
128 }
129
130 /**
131  * Update client epoch with server's one
132  */
133 void obt_client_epoch_update(struct obd_export *exp)
134 {
135         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
136         struct lu_target *lut = class_exp2tgt(exp);
137
138         /** VBR: set client last_epoch to current epoch */
139         if (le32_to_cpu(lcd->lcd_last_epoch) >=
140             le32_to_cpu(lut->lut_lsd.lsd_start_epoch))
141                 return;
142         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
143         obt_client_data_update(exp);
144 }
145
146 /**
147  * Increment server epoch. An obd API
148  */
149 static void obt_boot_epoch_update(struct lu_target *lut)
150 {
151         struct obd_device *obd = lut->lut_obd;
152         __u32 start_epoch;
153         struct ptlrpc_request *req;
154         cfs_list_t client_list;
155
156         cfs_spin_lock(&lut->lut_translock);
157         start_epoch = lr_epoch(le64_to_cpu(lut->lut_last_transno)) + 1;
158         lut->lut_last_transno = cpu_to_le64((__u64)start_epoch <<
159                                             LR_EPOCH_BITS);
160         lut->lut_lsd.lsd_start_epoch = cpu_to_le32(start_epoch);
161         cfs_spin_unlock(&lut->lut_translock);
162
163         CFS_INIT_LIST_HEAD(&client_list);
164         cfs_spin_lock(&obd->obd_recovery_task_lock);
165         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
166         cfs_spin_unlock(&obd->obd_recovery_task_lock);
167
168         /**
169          * go through list of exports participated in recovery and
170          * set new epoch for them
171          */
172         cfs_list_for_each_entry(req, &client_list, rq_list) {
173                 LASSERT(!req->rq_export->exp_delayed);
174                 obt_client_epoch_update(req->rq_export);
175         }
176         /** return list back at once */
177         cfs_spin_lock(&obd->obd_recovery_task_lock);
178         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
179         cfs_spin_unlock(&obd->obd_recovery_task_lock);
180         obt_server_data_update(lut, 1);
181 }
182
183 /**
184  * Allocate in-memory data for client slot related to export.
185  */
186 int lut_client_alloc(struct obd_export *exp)
187 {
188         LASSERT(exp != exp->exp_obd->obd_self_export);
189
190         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
191         if (exp->exp_target_data.ted_lcd == NULL)
192                 RETURN(-ENOMEM);
193         /* Mark that slot is not yet valid, 0 doesn't work here */
194         exp->exp_target_data.ted_lr_idx = -1;
195         RETURN(0);
196 }
197 EXPORT_SYMBOL(lut_client_alloc);
198
199 /**
200  * Free in-memory data for client slot related to export.
201  */
202 void lut_client_free(struct obd_export *exp)
203 {
204         struct tg_export_data *ted = &exp->exp_target_data;
205         struct lu_target *lut = class_exp2tgt(exp);
206
207         LASSERT(exp != exp->exp_obd->obd_self_export);
208
209         OBD_FREE_PTR(ted->ted_lcd);
210         ted->ted_lcd = NULL;
211
212         /* Slot may be not yet assigned */
213         if (ted->ted_lr_idx < 0)
214                 return;
215         /* Clear bit when lcd is freed */
216         LASSERT(lut->lut_client_bitmap);
217         if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
218                 CERROR("%s: client %u bit already clear in bitmap\n",
219                        exp->exp_obd->obd_name, ted->ted_lr_idx);
220                 LBUG();
221         }
222 }
223 EXPORT_SYMBOL(lut_client_free);
224
225 int lut_client_data_read(const struct lu_env *env, struct lu_target *tg,
226                          struct lsd_client_data *lcd, loff_t *off, int index)
227 {
228         struct tg_thread_info *tti = tg_th_info(env);
229         int rc;
230
231         tti_buf_lcd(tti);
232         rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, off);
233         if (rc == 0) {
234                 check_lcd(tg->lut_obd->obd_name, index, &tti->tti_lcd);
235                 lcd_le_to_cpu(&tti->tti_lcd, lcd);
236         }
237
238         CDEBUG(D_INFO, "read lcd @%lld rc = %d, uuid = %s, last_transno = "LPU64
239                ", last_xid = "LPU64", last_result = %u, last_data = %u, "
240                "last_close_transno = "LPU64", last_close_xid = "LPU64", "
241                "last_close_result = %u\n", *off,
242                rc, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
243                lcd->lcd_last_result, lcd->lcd_last_data,
244                lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
245                lcd->lcd_last_close_result);
246         return rc;
247 }
248 EXPORT_SYMBOL(lut_client_data_read);
249
250 int lut_client_data_write(const struct lu_env *env, struct lu_target *tg,
251                           struct lsd_client_data *lcd, loff_t *off,
252                           struct thandle *th)
253 {
254         struct tg_thread_info *tti = tg_th_info(env);
255
256         lcd_cpu_to_le(lcd, &tti->tti_lcd);
257         tti_buf_lcd(tti);
258
259         return dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, off, th);
260 }
261 EXPORT_SYMBOL(lut_client_data_write);
262
263 /**
264  * Update client data in last_rcvd
265  */
266 int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
267 {
268         struct tg_export_data *ted = &exp->exp_target_data;
269         struct lu_target      *tg = class_exp2tgt(exp);
270         struct tg_thread_info *tti = tg_th_info(env);
271         struct thandle        *th;
272         int                    rc = 0;
273
274         ENTRY;
275
276         th = dt_trans_create(env, tg->lut_bottom);
277         if (IS_ERR(th))
278                 RETURN(PTR_ERR(th));
279
280         rc = dt_declare_record_write(env, tg->lut_last_rcvd,
281                                      sizeof(struct lsd_client_data),
282                                      ted->ted_lr_off, th);
283         if (rc)
284                 GOTO(out, rc);
285
286         rc = dt_trans_start_local(env, tg->lut_bottom, th);
287         if (rc)
288                 GOTO(out, rc);
289         /*
290          * Until this operations will be committed the sync is needed
291          * for this export. This should be done _after_ starting the
292          * transaction so that many connecting clients will not bring
293          * server down with lots of sync writes.
294          */
295         rc = lut_new_client_cb_add(th, exp);
296         if (rc) {
297                 /* can't add callback, do sync now */
298                 th->th_sync = 1;
299         } else {
300                 cfs_spin_lock(&exp->exp_lock);
301                 exp->exp_need_sync = 1;
302                 cfs_spin_unlock(&exp->exp_lock);
303         }
304
305         tti->tti_off = ted->ted_lr_off;
306         rc = lut_client_data_write(env, tg, ted->ted_lcd, &tti->tti_off, th);
307         EXIT;
308 out:
309         dt_trans_stop(env, tg->lut_bottom, th);
310         CDEBUG(D_INFO, "write last_rcvd, rc = %d:\n"
311                "uuid = %s\nlast_transno = "LPU64"\n",
312                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
313
314         return rc;
315 }
316
317 int lut_server_data_read(const struct lu_env *env, struct lu_target *tg)
318 {
319         struct tg_thread_info *tti = tg_th_info(env);
320         int rc;
321
322         tti->tti_off = 0;
323         tti_buf_lsd(tti);
324         rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, &tti->tti_off);
325         if (rc == 0)
326                 lsd_le_to_cpu(&tti->tti_lsd, &tg->lut_lsd);
327
328         CDEBUG(D_INFO, "%s: read last_rcvd header, rc = %d, uuid = %s, "
329                "last_transno = "LPU64"\n", tg->lut_obd->obd_name, rc,
330                tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
331         return rc;
332 }
333 EXPORT_SYMBOL(lut_server_data_read);
334
335 int lut_server_data_write(const struct lu_env *env, struct lu_target *tg,
336                           struct thandle *th)
337 {
338         struct tg_thread_info *tti = tg_th_info(env);
339         int rc;
340         ENTRY;
341
342         tti->tti_off = 0;
343         tti_buf_lsd(tti);
344         lsd_cpu_to_le(&tg->lut_lsd, &tti->tti_lsd);
345
346         rc = dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf,
347                              &tti->tti_off, th);
348
349         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
350                "uuid = %s\nlast_transno = "LPU64"\n",
351                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
352
353         RETURN(rc);
354 }
355 EXPORT_SYMBOL(lut_server_data_write);
356
357 /**
358  * Update server data in last_rcvd
359  */
360 int lut_server_data_update(const struct lu_env *env, struct lu_target *tg,
361                            int sync)
362 {
363         struct tg_thread_info *tti = tg_th_info(env);
364         struct thandle  *th;
365         int                 rc = 0;
366
367         ENTRY;
368
369         CDEBUG(D_SUPER,
370                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
371                tg->lut_lsd.lsd_uuid, tg->lut_obd->u.obt.obt_mount_count,
372                tg->lut_last_transno);
373
374         /* Always save latest transno to keep it fresh */
375         cfs_spin_lock(&tg->lut_translock);
376         tg->lut_lsd.lsd_last_transno = tg->lut_last_transno;
377         cfs_spin_unlock(&tg->lut_translock);
378
379         th = dt_trans_create(env, tg->lut_bottom);
380         if (IS_ERR(th))
381                 RETURN(PTR_ERR(th));
382
383         th->th_sync = sync;
384
385         rc = dt_declare_record_write(env, tg->lut_last_rcvd,
386                                      sizeof(struct lr_server_data),
387                                      tti->tti_off, th);
388         if (rc)
389                 GOTO(out, rc);
390
391         rc = dt_trans_start(env, tg->lut_bottom, th);
392         if (rc)
393                 GOTO(out, rc);
394
395         rc = lut_server_data_write(env, tg, th);
396 out:
397         dt_trans_stop(env, tg->lut_bottom, th);
398
399         CDEBUG(D_INFO, "write last_rcvd header, rc = %d:\n"
400                "uuid = %s\nlast_transno = "LPU64"\n",
401                rc, tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno);
402         RETURN(rc);
403 }
404 EXPORT_SYMBOL(lut_server_data_update);
405
406 int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
407                            loff_t size)
408 {
409         struct dt_object *dt = tg->lut_last_rcvd;
410         struct thandle   *th;
411         struct lu_attr    attr;
412         int               rc;
413
414         ENTRY;
415
416         attr.la_size = size;
417         attr.la_valid = LA_SIZE;
418
419         th = dt_trans_create(env, tg->lut_bottom);
420         if (IS_ERR(th))
421                 RETURN(PTR_ERR(th));
422         rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
423         if (rc)
424                 GOTO(cleanup, rc);
425         rc = dt_declare_attr_set(env, dt, &attr, th);
426         if (rc)
427                 GOTO(cleanup, rc);
428         rc = dt_trans_start_local(env, tg->lut_bottom, th);
429         if (rc)
430                 GOTO(cleanup, rc);
431
432         rc = dt_punch(env, dt, size, OBD_OBJECT_EOF, th, BYPASS_CAPA);
433         if (rc == 0)
434                 rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
435
436 cleanup:
437         dt_trans_stop(env, tg->lut_bottom, th);
438
439         RETURN(rc);
440 }
441 EXPORT_SYMBOL(lut_truncate_last_rcvd);
442
443 void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
444 {
445         struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
446         struct lu_target *lut = class_exp2tgt(exp);
447
448         LASSERT(lut->lut_bottom);
449         /** VBR: set client last_epoch to current epoch */
450         if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
451                 return;
452         lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
453         lut_client_data_update(env, exp);
454 }
455
456 /**
457  * Update boot epoch when recovery ends
458  */
459 void lut_boot_epoch_update(struct lu_target *lut)
460 {
461         struct lu_env env;
462         struct ptlrpc_request *req;
463         __u32 start_epoch;
464         cfs_list_t client_list;
465         int rc;
466
467         if (lut->lut_obd->obd_stopping)
468                 return;
469         /** Increase server epoch after recovery */
470         if (lut->lut_bottom == NULL)
471                 return obt_boot_epoch_update(lut);
472
473         rc = lu_env_init(&env, LCT_DT_THREAD);
474         if (rc) {
475                 CERROR("Can't initialize environment rc=%d\n", rc);
476                 return;
477         }
478
479         cfs_spin_lock(&lut->lut_translock);
480         start_epoch = lr_epoch(lut->lut_last_transno) + 1;
481         lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
482         lut->lut_lsd.lsd_start_epoch = start_epoch;
483         cfs_spin_unlock(&lut->lut_translock);
484
485         CFS_INIT_LIST_HEAD(&client_list);
486         /**
487          * The recovery is not yet finished and final queue can still be updated
488          * with resend requests. Move final list to separate one for processing
489          */
490         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
491         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
492         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
493
494         /**
495          * go through list of exports participated in recovery and
496          * set new epoch for them
497          */
498         cfs_list_for_each_entry(req, &client_list, rq_list) {
499                 LASSERT(!req->rq_export->exp_delayed);
500                 if (!req->rq_export->exp_vbr_failed)
501                         lut_client_epoch_update(&env, req->rq_export);
502         }
503         /** return list back at once */
504         cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
505         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
506         cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
507         /** update server epoch */
508         lut_server_data_update(&env, lut, 1);
509         lu_env_fini(&env);
510 }
511 EXPORT_SYMBOL(lut_boot_epoch_update);
512
513 /**
514  * commit callback, need to update last_commited value
515  */
516 struct lut_last_committed_callback {
517         struct dt_txn_commit_cb llcc_cb;
518         struct lu_target       *llcc_lut;
519         struct obd_export      *llcc_exp;
520         __u64                   llcc_transno;
521 };
522
523 void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
524                            struct dt_txn_commit_cb *cb, int err)
525 {
526         struct lut_last_committed_callback *ccb;
527
528         ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
529
530         LASSERT(ccb->llcc_lut != NULL);
531         LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
532
533         cfs_spin_lock(&ccb->llcc_lut->lut_translock);
534         if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
535                 ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
536
537         LASSERT(ccb->llcc_exp);
538         if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
539                 ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
540                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
541                 ptlrpc_commit_replies(ccb->llcc_exp);
542         } else {
543                 cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
544         }
545         class_export_cb_put(ccb->llcc_exp);
546         if (ccb->llcc_transno)
547                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
548                        ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
549         OBD_FREE_PTR(ccb);
550 }
551
552 int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
553                            struct obd_export *exp, __u64 transno)
554 {
555         struct lut_last_committed_callback *ccb;
556         struct dt_txn_commit_cb            *dcb;
557         int                                rc;
558
559         OBD_ALLOC_PTR(ccb);
560         if (ccb == NULL)
561                 return -ENOMEM;
562
563         ccb->llcc_lut     = lut;
564         ccb->llcc_exp     = class_export_cb_get(exp);
565         ccb->llcc_transno = transno;
566
567         dcb            = &ccb->llcc_cb;
568         dcb->dcb_func  = lut_cb_last_committed;
569         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
570         strncpy(dcb->dcb_name, "lut_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
571         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
572
573         rc = dt_trans_cb_add(th, dcb);
574         if (rc) {
575                 class_export_cb_put(exp);
576                 OBD_FREE_PTR(ccb);
577         }
578
579         if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
580                 /* report failure to force synchronous operation */
581                 return -EPERM;
582
583         return rc;
584 }
585 EXPORT_SYMBOL(lut_last_commit_cb_add);
586
587 struct lut_new_client_callback {
588         struct dt_txn_commit_cb lncc_cb;
589         struct obd_export      *lncc_exp;
590 };
591
592 void lut_cb_new_client(struct lu_env *env, struct thandle *th,
593                        struct dt_txn_commit_cb *cb, int err)
594 {
595         struct lut_new_client_callback *ccb;
596
597         ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
598
599         LASSERT(ccb->lncc_exp->exp_obd);
600
601         CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
602                ccb->lncc_exp->exp_obd->obd_name,
603                ccb->lncc_exp->exp_client_uuid.uuid);
604
605         cfs_spin_lock(&ccb->lncc_exp->exp_lock);
606         ccb->lncc_exp->exp_need_sync = 0;
607         cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
608         class_export_cb_put(ccb->lncc_exp);
609
610         OBD_FREE_PTR(ccb);
611 }
612
613 int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
614 {
615         struct lut_new_client_callback *ccb;
616         struct dt_txn_commit_cb        *dcb;
617         int                            rc;
618
619         OBD_ALLOC_PTR(ccb);
620         if (ccb == NULL)
621                 return -ENOMEM;
622
623         ccb->lncc_exp  = class_export_cb_get(exp);
624
625         dcb            = &ccb->lncc_cb;
626         dcb->dcb_func  = lut_cb_new_client;
627         CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
628         strncpy(dcb->dcb_name, "lut_cb_new_client", MAX_COMMIT_CB_STR_LEN);
629         dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
630
631         rc = dt_trans_cb_add(th, dcb);
632         if (rc) {
633                 class_export_cb_put(exp);
634                 OBD_FREE_PTR(ccb);
635         }
636         return rc;
637 }
638
639 /**
640  * Add new client to the last_rcvd upon new connection.
641  *
642  * We use a bitmap to locate a free space in the last_rcvd file and initialize
643  * tg_export_data.
644  */
645 int lut_client_new(const struct lu_env *env, struct obd_export *exp)
646 {
647         struct tg_export_data *ted = &exp->exp_target_data;
648         struct lu_target *tg = class_exp2tgt(exp);
649         int rc = 0, idx;
650
651         ENTRY;
652
653         LASSERT(tg->lut_client_bitmap != NULL);
654         if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
655                 RETURN(0);
656
657         cfs_mutex_init(&ted->ted_lcd_lock);
658
659         if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
660                 RETURN(0);
661
662         /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
663          * there's no need for extra complication here
664          */
665         idx = cfs_find_first_zero_bit(tg->lut_client_bitmap, LR_MAX_CLIENTS);
666 repeat:
667         if (idx >= LR_MAX_CLIENTS ||
668             OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
669                 CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
670                        tg->lut_obd->obd_name,  idx);
671                 RETURN(-EOVERFLOW);
672         }
673         if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
674                 idx = cfs_find_next_zero_bit(tg->lut_client_bitmap,
675                                              LR_MAX_CLIENTS, idx);
676                 goto repeat;
677         }
678
679         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
680                tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
681
682         ted->ted_lr_idx = idx;
683         ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
684                           idx * tg->lut_lsd.lsd_client_size;
685
686         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
687
688         CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
689                tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
690                ted->ted_lcd->lcd_uuid);
691
692         if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
693                 RETURN(-ENOSPC);
694
695         rc = lut_client_data_update(env, exp);
696         if (rc)
697                 CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
698                        tg->lut_obd->obd_name, idx, rc);
699
700         RETURN(rc);
701 }
702 EXPORT_SYMBOL(lut_client_new);
703
704 /* Add client data to the MDS.  We use a bitmap to locate a free space
705  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
706  * Otherwise, we just have to read the data from the last_rcvd file and
707  * we know its offset.
708  *
709  * It should not be possible to fail adding an existing client - otherwise
710  * mdt_init_server_data() callsite needs to be fixed.
711  */
712 int lut_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
713 {
714         struct tg_export_data *ted = &exp->exp_target_data;
715         struct lu_target *tg = class_exp2tgt(exp);
716
717         ENTRY;
718
719         LASSERT(tg->lut_client_bitmap != NULL);
720         LASSERTF(idx >= 0, "%d\n", idx);
721
722         if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid) ||
723             (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
724                 RETURN(0);
725
726         if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
727                 CERROR("%s: client %d: bit already set in bitmap!!\n",
728                        tg->lut_obd->obd_name,  idx);
729                 LBUG();
730         }
731
732         CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
733                tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
734
735         ted->ted_lr_idx = idx;
736         ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
737                           idx * tg->lut_lsd.lsd_client_size;
738
739         cfs_mutex_init(&ted->ted_lcd_lock);
740
741         LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
742
743         RETURN(0);
744 }
745 EXPORT_SYMBOL(lut_client_add);
746
747 int lut_client_del(const struct lu_env *env, struct obd_export *exp)
748 {
749         struct tg_export_data *ted = &exp->exp_target_data;
750         struct lu_target      *tg = class_exp2tgt(exp);
751         int                    rc;
752
753         ENTRY;
754
755         LASSERT(ted->ted_lcd);
756
757         /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
758         if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
759                     (char *)tg->lut_obd->obd_uuid.uuid) ||
760             (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
761                 RETURN(0);
762
763         CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
764                tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
765                ted->ted_lcd->lcd_uuid);
766
767         /* Clear the bit _after_ zeroing out the client so we don't
768            race with filter_client_add and zero out new clients.*/
769         if (!cfs_test_bit(ted->ted_lr_idx, tg->lut_client_bitmap)) {
770                 CERROR("%s: client %u: bit already clear in bitmap!!\n",
771                        tg->lut_obd->obd_name, ted->ted_lr_idx);
772                 LBUG();
773         }
774
775         /* Do not erase record for recoverable client. */
776         if (exp->exp_flags & OBD_OPT_FAILOVER)
777                 RETURN(0);
778
779         /* Make sure the server's last_transno is up to date.
780          * This should be done before zeroing client slot so last_transno will
781          * be in server data or in client data in case of failure */
782         rc = lut_server_data_update(env, tg, 0);
783         if (rc != 0) {
784                 CERROR("%s: failed to update server data, skip client %s "
785                        "zeroing, rc %d\n", tg->lut_obd->obd_name,
786                        ted->ted_lcd->lcd_uuid, rc);
787                 RETURN(rc);
788         }
789
790         cfs_mutex_lock(&ted->ted_lcd_lock);
791         memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
792         rc = lut_client_data_update(env, exp);
793         cfs_mutex_unlock(&ted->ted_lcd_lock);
794
795         CDEBUG(rc == 0 ? D_INFO : D_ERROR,
796                "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
797                tg->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
798                ted->ted_lr_idx, ted->ted_lr_off, rc);
799         RETURN(rc);
800 }
801 EXPORT_SYMBOL(lut_client_del);
802
803 int lut_init(const struct lu_env *env, struct lu_target *lut,
804              struct obd_device *obd, struct dt_device *dt)
805 {
806         struct dt_object_format dof;
807         struct lu_attr          attr;
808         struct lu_fid           fid;
809         struct dt_object       *o;
810         int                     rc = 0;
811         ENTRY;
812
813         LASSERT(lut);
814         LASSERT(obd);
815         lut->lut_obd = obd;
816         lut->lut_bottom = dt;
817         lut->lut_last_rcvd = NULL;
818         obd->u.obt.obt_lut = lut;
819
820         cfs_spin_lock_init(&lut->lut_translock);
821
822         OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
823         if (lut->lut_client_bitmap == NULL)
824                 RETURN(-ENOMEM);
825
826         /** obdfilter has no lu_device stack yet */
827         if (dt == NULL)
828                 RETURN(rc);
829
830         memset(&attr, 0, sizeof(attr));
831         attr.la_valid = LA_MODE;
832         attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
833         dof.dof_type = dt_mode_to_dft(S_IFREG);
834
835         lu_local_obj_fid(&fid, MDT_LAST_RECV_OID);
836
837         o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
838         if (!IS_ERR(o)) {
839                 lut->lut_last_rcvd = o;
840         } else {
841                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
842                 lut->lut_client_bitmap = NULL;
843                 rc = PTR_ERR(o);
844                 CERROR("cannot open %s: rc = %d\n", LAST_RCVD, rc);
845         }
846
847         RETURN(rc);
848 }
849 EXPORT_SYMBOL(lut_init);
850
851 void lut_fini(const struct lu_env *env, struct lu_target *lut)
852 {
853         ENTRY;
854
855         if (lut->lut_client_bitmap) {
856                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
857                 lut->lut_client_bitmap = NULL;
858         }
859         if (lut->lut_last_rcvd) {
860                 lu_object_put(env, &lut->lut_last_rcvd->do_lu);
861                 lut->lut_last_rcvd = NULL;
862         }
863         EXIT;
864 }
865 EXPORT_SYMBOL(lut_fini);
866
867 /* context key constructor/destructor: tg_key_init, tg_key_fini */
868 LU_KEY_INIT_FINI(tg, struct tg_thread_info);
869 /* context key: tg_thread_key */
870 LU_CONTEXT_KEY_DEFINE(tg, LCT_MD_THREAD|LCT_DT_THREAD);
871 LU_KEY_INIT_GENERIC(tg);
872 EXPORT_SYMBOL(tg_thread_key);
873
874 int lut_mod_init(void)
875 {
876         tg_key_init_generic(&tg_thread_key, NULL);
877         lu_context_key_register_many(&tg_thread_key, NULL);
878         return 0;
879 }
880
881 void lut_mod_exit(void)
882 {
883         lu_context_key_degister_many(&tg_thread_key, NULL);
884 }
885