Whamcloud - gitweb
LU-2349 osp: Move log ops init to module init.
[fs/lustre-release.git] / lustre / osp / osp_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_dev.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.pershin@intel.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <obd_class.h>
50 #include <lustre_param.h>
51 #include <lustre_log.h>
52
53 #include "osp_internal.h"
54
55 /* Slab for OSP object allocation */
56 cfs_mem_cache_t *osp_object_kmem;
57
58 static struct lu_kmem_descr osp_caches[] = {
59         {
60                 .ckd_cache = &osp_object_kmem,
61                 .ckd_name  = "osp_obj",
62                 .ckd_size  = sizeof(struct osp_object)
63         },
64         {
65                 .ckd_cache = NULL
66         }
67 };
68
69 struct lu_object *osp_object_alloc(const struct lu_env *env,
70                                    const struct lu_object_header *hdr,
71                                    struct lu_device *d)
72 {
73         struct lu_object_header *h;
74         struct osp_object       *o;
75         struct lu_object        *l;
76
77         LASSERT(hdr == NULL);
78
79         OBD_SLAB_ALLOC_PTR_GFP(o, osp_object_kmem, CFS_ALLOC_IO);
80         if (o != NULL) {
81                 l = &o->opo_obj.do_lu;
82                 h = &o->opo_header;
83
84                 lu_object_header_init(h);
85                 dt_object_init(&o->opo_obj, h, d);
86                 lu_object_add_top(h, l);
87
88                 l->lo_ops = &osp_lu_obj_ops;
89
90                 return l;
91         } else {
92                 return NULL;
93         }
94 }
95
96 /* Update opd_last_used_id along with checking for gap in objid sequence */
97 void osp_update_last_id(struct osp_device *d, obd_id objid)
98 {
99         /*
100          * we might have lost precreated objects due to VBR and precreate
101          * orphans, the gap in objid can be calculated properly only here
102          */
103         if (objid > le64_to_cpu(d->opd_last_used_id)) {
104                 if (objid - le64_to_cpu(d->opd_last_used_id) > 1) {
105                         d->opd_gap_start = le64_to_cpu(d->opd_last_used_id) + 1;
106                         d->opd_gap_count = objid - d->opd_gap_start;
107                         CDEBUG(D_HA, "Gap in objids: %d, start = %llu\n",
108                                d->opd_gap_count, d->opd_gap_start);
109                 }
110                 d->opd_last_used_id = cpu_to_le64(objid);
111         }
112 }
113
114 static int osp_last_used_init(const struct lu_env *env, struct osp_device *m)
115 {
116         struct osp_thread_info  *osi = osp_env_info(env);
117         struct dt_object_format  dof = { 0 };
118         struct dt_object        *o;
119         int                      rc;
120
121         ENTRY;
122
123         osi->osi_attr.la_valid = LA_MODE;
124         osi->osi_attr.la_mode = S_IFREG | 0644;
125         lu_local_obj_fid(&osi->osi_fid, MDD_LOV_OBJ_OID);
126         dof.dof_type = DFT_REGULAR;
127         o = dt_find_or_create(env, m->opd_storage, &osi->osi_fid, &dof,
128                               &osi->osi_attr);
129         if (IS_ERR(o))
130                 RETURN(PTR_ERR(o));
131
132         rc = dt_attr_get(env, o, &osi->osi_attr, NULL);
133         if (rc)
134                 GOTO(out, rc);
135
136         /* object will be released in device cleanup path */
137         m->opd_last_used_file = o;
138
139         if (osi->osi_attr.la_size >= sizeof(osi->osi_id) *
140                                      (m->opd_index + 1)) {
141                 osp_objid_buf_prep(osi, m, m->opd_index);
142                 rc = dt_record_read(env, o, &osi->osi_lb, &osi->osi_off);
143                 if (rc != 0)
144                         GOTO(out, rc);
145         } else {
146                 /* reset value to 0, just to make sure and change file's size */
147                 struct thandle *th;
148
149                 m->opd_last_used_id = 0;
150                 osp_objid_buf_prep(osi, m, m->opd_index);
151
152                 th = dt_trans_create(env, m->opd_storage);
153                 if (IS_ERR(th))
154                         GOTO(out, rc = PTR_ERR(th));
155
156                 rc = dt_declare_record_write(env, m->opd_last_used_file,
157                                              osi->osi_lb.lb_len, osi->osi_off,
158                                              th);
159                 if (rc) {
160                         dt_trans_stop(env, m->opd_storage, th);
161                         GOTO(out, rc);
162                 }
163
164                 rc = dt_trans_start_local(env, m->opd_storage, th);
165                 if (rc) {
166                         dt_trans_stop(env, m->opd_storage, th);
167                         GOTO(out, rc);
168                 }
169
170                 rc = dt_record_write(env, m->opd_last_used_file, &osi->osi_lb,
171                                      &osi->osi_off, th);
172                 dt_trans_stop(env, m->opd_storage, th);
173                 if (rc)
174                         GOTO(out, rc);
175         }
176         CDEBUG(D_HA, "%s: Read last used ID: "LPU64"\n", m->opd_obd->obd_name,
177                le64_to_cpu(m->opd_last_used_id));
178         RETURN(0);
179 out:
180         CERROR("%s: can't initialize lov_objid: %d\n",
181                m->opd_obd->obd_name, rc);
182         lu_object_put(env, &o->do_lu);
183         m->opd_last_used_file = NULL;
184         return rc;
185 }
186
187 static void osp_last_used_fini(const struct lu_env *env, struct osp_device *d)
188 {
189         if (d->opd_last_used_file != NULL) {
190                 lu_object_put(env, &d->opd_last_used_file->do_lu);
191                 d->opd_last_used_file = NULL;
192         }
193 }
194
195 int osp_disconnect(struct osp_device *d)
196 {
197         struct obd_import *imp;
198         int rc = 0;
199
200         imp = d->opd_obd->u.cli.cl_import;
201
202         /* Mark import deactivated now, so we don't try to reconnect if any
203          * of the cleanup RPCs fails (e.g. ldlm cancel, etc).  We don't
204          * fully deactivate the import, or that would drop all requests. */
205         LASSERT(imp != NULL);
206         spin_lock(&imp->imp_lock);
207         imp->imp_deactive = 1;
208         spin_unlock(&imp->imp_lock);
209
210         ptlrpc_deactivate_import(imp);
211
212         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
213          * delete it regardless.  (It's safe to delete an import that was
214          * never added.) */
215         (void)ptlrpc_pinger_del_import(imp);
216
217         rc = ptlrpc_disconnect_import(imp, 0);
218         if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ESHUTDOWN)
219                 rc = 0;
220         if (rc)
221                 CERROR("%s: can't disconnect: rc = %d\n",
222                        d->opd_obd->obd_name, rc);
223
224         ptlrpc_invalidate_import(imp);
225
226         RETURN(rc);
227 }
228
229 static int osp_shutdown(const struct lu_env *env, struct osp_device *d)
230 {
231         int                      rc = 0;
232         ENTRY;
233
234         if (is_osp_on_ost(d->opd_obd->obd_name)) {
235                 rc = osp_disconnect(d);
236                 RETURN(rc);
237         }
238
239         LASSERT(env);
240         /* release last_used file */
241         osp_last_used_fini(env, d);
242
243         rc = osp_disconnect(d);
244
245         /* stop precreate thread */
246         osp_precreate_fini(d);
247
248         /* stop sync thread */
249         osp_sync_fini(d);
250
251         RETURN(rc);
252 }
253
254 static int osp_process_config(const struct lu_env *env,
255                               struct lu_device *dev, struct lustre_cfg *lcfg)
256 {
257         struct osp_device               *d = lu2osp_dev(dev);
258         struct lprocfs_static_vars       lvars = { 0 };
259         int                              rc;
260
261         ENTRY;
262
263         switch (lcfg->lcfg_command) {
264         case LCFG_CLEANUP:
265                 if (!is_osp_on_ost(d->opd_obd->obd_name))
266                         lu_dev_del_linkage(dev->ld_site, dev);
267                 rc = osp_shutdown(env, d);
268                 break;
269         case LCFG_PARAM:
270                 lprocfs_osp_init_vars(&lvars);
271
272                 LASSERT(d->opd_obd);
273                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
274                                               lcfg, d->opd_obd);
275                 if (rc > 0)
276                         rc = 0;
277                 if (rc == -ENOSYS) {
278                         /* class_process_proc_param() haven't found matching
279                          * parameter and returned ENOSYS so that layer(s)
280                          * below could use that. But OSP is the bottom, so
281                          * just ignore it */
282                         CERROR("%s: unknown param %s\n",
283                                (char *)lustre_cfg_string(lcfg, 0),
284                                (char *)lustre_cfg_string(lcfg, 1));
285                         rc = 0;
286                 }
287                 break;
288         default:
289                 CERROR("%s: unknown command %u\n",
290                        (char *)lustre_cfg_string(lcfg, 0), lcfg->lcfg_command);
291                 rc = 0;
292                 break;
293         }
294
295         RETURN(rc);
296 }
297
298 static int osp_recovery_complete(const struct lu_env *env,
299                                  struct lu_device *dev)
300 {
301         struct osp_device       *osp = lu2osp_dev(dev);
302         int                      rc = 0;
303
304         ENTRY;
305         osp->opd_recovery_completed = 1;
306         cfs_waitq_signal(&osp->opd_pre_waitq);
307         RETURN(rc);
308 }
309
310 const struct lu_device_operations osp_lu_ops = {
311         .ldo_object_alloc       = osp_object_alloc,
312         .ldo_process_config     = osp_process_config,
313         .ldo_recovery_complete  = osp_recovery_complete,
314 };
315
316 /**
317  * provides with statfs from corresponded OST
318  *
319  */
320 static int osp_statfs(const struct lu_env *env, struct dt_device *dev,
321                       struct obd_statfs *sfs)
322 {
323         struct osp_device *d = dt2osp_dev(dev);
324
325         ENTRY;
326
327         if (unlikely(d->opd_imp_active == 0))
328                 RETURN(-ENOTCONN);
329
330         /* return recently updated data */
331         *sfs = d->opd_statfs;
332
333         /*
334          * layer above osp (usually lod) can use ffree to estimate
335          * how many objects are available for immediate creation
336          */
337         spin_lock(&d->opd_pre_lock);
338         sfs->os_fprecreated = d->opd_pre_last_created - d->opd_pre_used_id;
339         sfs->os_fprecreated -= d->opd_pre_reserved;
340         spin_unlock(&d->opd_pre_lock);
341
342         LASSERT(sfs->os_fprecreated <= OST_MAX_PRECREATE * 2);
343
344         CDEBUG(D_OTHER, "%s: "LPU64" blocks, "LPU64" free, "LPU64" avail, "
345                LPU64" files, "LPU64" free files\n", d->opd_obd->obd_name,
346                sfs->os_blocks, sfs->os_bfree, sfs->os_bavail,
347                sfs->os_files, sfs->os_ffree);
348         RETURN(0);
349 }
350
351 static int osp_sync(const struct lu_env *env, struct dt_device *dev)
352 {
353         ENTRY;
354
355         /*
356          * XXX: wake up sync thread, command it to start flushing asap?
357          */
358
359         RETURN(0);
360 }
361
362 const struct dt_device_operations osp_dt_ops = {
363         .dt_statfs      = osp_statfs,
364         .dt_sync        = osp_sync,
365 };
366
367 static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *m,
368                               const char *nextdev)
369 {
370         struct obd_connect_data *data = NULL;
371         struct obd_device       *obd;
372         int                      rc;
373
374         ENTRY;
375
376         LASSERT(m->opd_storage_exp == NULL);
377
378         OBD_ALLOC_PTR(data);
379         if (data == NULL)
380                 RETURN(-ENOMEM);
381
382         obd = class_name2obd(nextdev);
383         if (obd == NULL) {
384                 CERROR("%s: can't locate next device: %s\n",
385                        m->opd_obd->obd_name, nextdev);
386                 GOTO(out, rc = -ENOTCONN);
387         }
388
389         rc = obd_connect(env, &m->opd_storage_exp, obd, &obd->obd_uuid, data,
390                          NULL);
391         if (rc) {
392                 CERROR("%s: cannot connect to next dev %s: rc = %d\n",
393                        m->opd_obd->obd_name, nextdev, rc);
394                 GOTO(out, rc);
395         }
396
397         m->opd_dt_dev.dd_lu_dev.ld_site =
398                 m->opd_storage_exp->exp_obd->obd_lu_dev->ld_site;
399         LASSERT(m->opd_dt_dev.dd_lu_dev.ld_site);
400         m->opd_storage = lu2dt_dev(m->opd_storage_exp->exp_obd->obd_lu_dev);
401
402 out:
403         OBD_FREE_PTR(data);
404         RETURN(rc);
405 }
406
407 static int osp_init0(const struct lu_env *env, struct osp_device *m,
408                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
409 {
410         struct obd_device               *obd;
411         struct obd_import               *imp;
412         class_uuid_t                     uuid;
413         char                            *src, *ost, *mdt, *osdname = NULL;
414         int                              rc, idx;
415
416         ENTRY;
417
418         obd = class_name2obd(lustre_cfg_string(cfg, 0));
419         if (obd == NULL) {
420                 CERROR("Cannot find obd with name %s\n",
421                        lustre_cfg_string(cfg, 0));
422                 RETURN(-ENODEV);
423         }
424         m->opd_obd = obd;
425
426         /* There is no record in the MDT configuration for the local disk
427          * device, so we have to extract this from elsewhere in the profile.
428          * The only information we get at setup is from the OSC records:
429          * setup 0:{fsname}-OSTxxxx-osc[-MDTxxxx] 1:lustre-OST0000_UUID 2:NID
430          * Note that 1.8 generated configs are missing the -MDTxxxx part.
431          * We need to reconstruct the name of the underlying OSD from this:
432          * {fsname}-{svname}-osd, for example "lustre-MDT0000-osd".  We
433          * also need to determine the OST index from this - will be used
434          * to calculate the offset in shared lov_objids file later */
435
436         src = lustre_cfg_string(cfg, 0);
437         if (src == NULL)
438                 RETURN(-EINVAL);
439
440         ost = strstr(src, "-OST");
441         if (ost == NULL)
442                 RETURN(-EINVAL);
443
444         idx = simple_strtol(ost + 4, &mdt, 16);
445         if (mdt[0] != '-' || idx > INT_MAX || idx < 0) {
446                 CERROR("%s: invalid OST index in '%s'\n", obd->obd_name, src);
447                 GOTO(out_fini, rc = -EINVAL);
448         }
449         m->opd_index = idx;
450
451         idx = ost - src;
452         /* check the fsname length, and after this everything else will fit */
453         if (idx > MTI_NAME_MAXLEN) {
454                 CERROR("%s: fsname too long in '%s'\n", obd->obd_name, src);
455                 GOTO(out_fini, rc = -EINVAL);
456         }
457
458         OBD_ALLOC(osdname, MAX_OBD_NAME);
459         if (osdname == NULL)
460                 GOTO(out_fini, rc = -ENOMEM);
461
462         memcpy(osdname, src, idx); /* copy just the fsname part */
463         osdname[idx] = '\0';
464
465         mdt = strstr(mdt, "-MDT");
466         if (mdt == NULL) /* 1.8 configs don't have "-MDT0000" at the end */
467                 strcat(osdname, "-MDT0000");
468         else
469                 strcat(osdname, mdt);
470         strcat(osdname, "-osd");
471         CDEBUG(D_HA, "%s: connect to %s (%s)\n", obd->obd_name, osdname, src);
472
473         m->opd_dt_dev.dd_lu_dev.ld_ops = &osp_lu_ops;
474         m->opd_dt_dev.dd_ops = &osp_dt_ops;
475         obd->obd_lu_dev = &m->opd_dt_dev.dd_lu_dev;
476
477         rc = osp_connect_to_osd(env, m, osdname);
478         if (rc)
479                 GOTO(out_fini, rc);
480
481         rc = ptlrpcd_addref();
482         if (rc)
483                 GOTO(out_disconnect, rc);
484
485         rc = client_obd_setup(obd, cfg);
486         if (rc) {
487                 CERROR("%s: can't setup obd: %d\n", m->opd_obd->obd_name, rc);
488                 GOTO(out_ref, rc);
489         }
490
491         osp_lprocfs_init(m);
492
493         /*
494          * Initialize last id from the storage - will be used in orphan cleanup
495          */
496         rc = osp_last_used_init(env, m);
497         if (rc)
498                 GOTO(out_proc, rc);
499
500         /*
501          * Initialize precreation thread, it handles new connections as well
502          */
503         rc = osp_init_precreate(m);
504         if (rc)
505                 GOTO(out_last_used, rc);
506
507         /*
508          * Initialize synhronization mechanism taking care of propogating
509          * changes to OST in near transactional manner
510          */
511         rc = osp_sync_init(env, m);
512         if (rc)
513                 GOTO(out_precreat, rc);
514
515         /*
516          * Initiate connect to OST
517          */
518         ll_generate_random_uuid(uuid);
519         class_uuid_unparse(uuid, &m->opd_cluuid);
520
521         imp = obd->u.cli.cl_import;
522
523         rc = ptlrpc_init_import(imp);
524         if (rc)
525                 GOTO(out, rc);
526         if (osdname)
527                 OBD_FREE(osdname, MAX_OBD_NAME);
528         RETURN(0);
529
530 out:
531         /* stop sync thread */
532         osp_sync_fini(m);
533 out_precreat:
534         /* stop precreate thread */
535         osp_precreate_fini(m);
536 out_last_used:
537         osp_last_used_fini(env, m);
538 out_proc:
539         ptlrpc_lprocfs_unregister_obd(obd);
540         lprocfs_obd_cleanup(obd);
541         class_destroy_import(obd->u.cli.cl_import);
542         client_obd_cleanup(obd);
543 out_ref:
544         ptlrpcd_decref();
545 out_disconnect:
546         obd_disconnect(m->opd_storage_exp);
547 out_fini:
548         if (osdname)
549                 OBD_FREE(osdname, MAX_OBD_NAME);
550         RETURN(rc);
551 }
552
553 static struct lu_device *osp_device_free(const struct lu_env *env,
554                                          struct lu_device *lu)
555 {
556         struct osp_device *m = lu2osp_dev(lu);
557
558         ENTRY;
559
560         if (cfs_atomic_read(&lu->ld_ref) && lu->ld_site) {
561                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
562                 lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
563         }
564         dt_device_fini(&m->opd_dt_dev);
565         OBD_FREE_PTR(m);
566         RETURN(NULL);
567 }
568
569 static struct lu_device *osp_device_alloc(const struct lu_env *env,
570                                           struct lu_device_type *t,
571                                           struct lustre_cfg *lcfg)
572 {
573         struct osp_device *m;
574         struct lu_device  *l;
575
576         OBD_ALLOC_PTR(m);
577         if (m == NULL) {
578                 l = ERR_PTR(-ENOMEM);
579         } else {
580                 int rc;
581
582                 l = osp2lu_dev(m);
583                 dt_device_init(&m->opd_dt_dev, t);
584                 if (is_osp_on_ost(lustre_cfg_string(lcfg, 0)))
585                         rc = osp_init_for_ost(env, m, t, lcfg);
586                 else
587                         rc = osp_init0(env, m, t, lcfg);
588                 if (rc != 0) {
589                         osp_device_free(env, l);
590                         l = ERR_PTR(rc);
591                 }
592         }
593         return l;
594 }
595
596 static struct lu_device *osp_device_fini(const struct lu_env *env,
597                                          struct lu_device *d)
598 {
599         struct osp_device *m = lu2osp_dev(d);
600         struct obd_import *imp;
601         int                rc;
602
603         ENTRY;
604
605         if (m->opd_storage_exp)
606                 obd_disconnect(m->opd_storage_exp);
607
608         if (is_osp_on_ost(m->opd_obd->obd_name))
609                 osp_fini_for_ost(m);
610
611         imp = m->opd_obd->u.cli.cl_import;
612
613         if (imp->imp_rq_pool) {
614                 ptlrpc_free_rq_pool(imp->imp_rq_pool);
615                 imp->imp_rq_pool = NULL;
616         }
617
618         obd_cleanup_client_import(m->opd_obd);
619
620         if (m->opd_symlink)
621                 lprocfs_remove(&m->opd_symlink);
622
623         LASSERT(m->opd_obd);
624         ptlrpc_lprocfs_unregister_obd(m->opd_obd);
625         lprocfs_obd_cleanup(m->opd_obd);
626
627         rc = client_obd_cleanup(m->opd_obd);
628         LASSERTF(rc == 0, "error %d\n", rc);
629
630         ptlrpcd_decref();
631
632         RETURN(NULL);
633 }
634
635 static int osp_reconnect(const struct lu_env *env,
636                          struct obd_export *exp, struct obd_device *obd,
637                          struct obd_uuid *cluuid,
638                          struct obd_connect_data *data,
639                          void *localdata)
640 {
641         return 0;
642 }
643
644 /*
645  * we use exports to track all LOD users
646  */
647 static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp,
648                            struct obd_device *obd, struct obd_uuid *cluuid,
649                            struct obd_connect_data *data, void *localdata)
650 {
651         struct osp_device       *osp = lu2osp_dev(obd->obd_lu_dev);
652         struct obd_connect_data *ocd;
653         struct obd_import       *imp;
654         struct lustre_handle     conn;
655         int                      rc;
656
657         ENTRY;
658
659         CDEBUG(D_CONFIG, "connect #%d\n", osp->opd_connects);
660
661         rc = class_connect(&conn, obd, cluuid);
662         if (rc)
663                 RETURN(rc);
664
665         *exp = class_conn2export(&conn);
666         if (is_osp_on_ost(obd->obd_name))
667                 osp->opd_exp = *exp;
668
669         /* Why should there ever be more than 1 connect? */
670         osp->opd_connects++;
671         LASSERT(osp->opd_connects == 1);
672
673         imp = osp->opd_obd->u.cli.cl_import;
674         imp->imp_dlm_handle = conn;
675
676         ocd = &imp->imp_connect_data;
677         ocd->ocd_connect_flags = OBD_CONNECT_AT |
678                                  OBD_CONNECT_FULL20 |
679                                  OBD_CONNECT_INDEX |
680 #ifdef HAVE_LRU_RESIZE_SUPPORT
681                                  OBD_CONNECT_LRU_RESIZE |
682 #endif
683                                  OBD_CONNECT_MDS |
684                                  OBD_CONNECT_OSS_CAPA |
685                                  OBD_CONNECT_REQPORTAL |
686                                  OBD_CONNECT_SKIP_ORPHAN |
687                                  OBD_CONNECT_VERSION |
688                                  OBD_CONNECT_FID |
689                                  OBD_CONNECT_LVB_TYPE;
690
691         if (is_osp_on_ost(osp->opd_obd->obd_name))
692                 ocd->ocd_connect_flags |= OBD_CONNECT_LIGHTWEIGHT;
693
694         ocd->ocd_version = LUSTRE_VERSION_CODE;
695         LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX);
696         ocd->ocd_index = data->ocd_index;
697         imp->imp_connect_flags_orig = ocd->ocd_connect_flags;
698
699         rc = ptlrpc_connect_import(imp);
700         if (rc) {
701                 CERROR("%s: can't connect obd: rc = %d\n", obd->obd_name, rc);
702                 GOTO(out, rc);
703         }
704
705         ptlrpc_pinger_add_import(imp);
706
707 out:
708         RETURN(rc);
709 }
710
711 /*
712  * once last export (we don't count self-export) disappeared
713  * osp can be released
714  */
715 static int osp_obd_disconnect(struct obd_export *exp)
716 {
717         struct obd_device *obd = exp->exp_obd;
718         struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
719         int                rc;
720         ENTRY;
721
722         /* Only disconnect the underlying layers on the final disconnect. */
723         LASSERT(osp->opd_connects == 1);
724         osp->opd_connects--;
725
726         rc = class_disconnect(exp);
727         if (rc) {
728                 CERROR("%s: class disconnect error: rc = %d\n",
729                        obd->obd_name, rc);
730                 RETURN(rc);
731         }
732
733         /* destroy the device */
734         if (!is_osp_on_ost(obd->obd_name))
735                 class_manual_cleanup(obd);
736
737         RETURN(rc);
738 }
739
740 /*
741  * lprocfs helpers still use OBD API, let's keep obd_statfs() support
742  */
743 static int osp_obd_statfs(const struct lu_env *env, struct obd_export *exp,
744                           struct obd_statfs *osfs, __u64 max_age, __u32 flags)
745 {
746         struct obd_statfs       *msfs;
747         struct ptlrpc_request   *req;
748         struct obd_import       *imp = NULL;
749         int                      rc;
750
751         ENTRY;
752
753         /* Since the request might also come from lprocfs, so we need
754          * sync this with client_disconnect_export Bug15684 */
755         down_read(&exp->exp_obd->u.cli.cl_sem);
756         if (exp->exp_obd->u.cli.cl_import)
757                 imp = class_import_get(exp->exp_obd->u.cli.cl_import);
758         up_read(&exp->exp_obd->u.cli.cl_sem);
759         if (!imp)
760                 RETURN(-ENODEV);
761
762         /* We could possibly pass max_age in the request (as an absolute
763          * timestamp or a "seconds.usec ago") so the target can avoid doing
764          * extra calls into the filesystem if that isn't necessary (e.g.
765          * during mount that would help a bit).  Having relative timestamps
766          * is not so great if request processing is slow, while absolute
767          * timestamps are not ideal because they need time synchronization. */
768         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
769
770         class_import_put(imp);
771
772         if (req == NULL)
773                 RETURN(-ENOMEM);
774
775         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
776         if (rc) {
777                 ptlrpc_request_free(req);
778                 RETURN(rc);
779         }
780         ptlrpc_request_set_replen(req);
781         req->rq_request_portal = OST_CREATE_PORTAL;
782         ptlrpc_at_set_req_timeout(req);
783
784         if (flags & OBD_STATFS_NODELAY) {
785                 /* procfs requests not want stat in wait for avoid deadlock */
786                 req->rq_no_resend = 1;
787                 req->rq_no_delay = 1;
788         }
789
790         rc = ptlrpc_queue_wait(req);
791         if (rc)
792                 GOTO(out, rc);
793
794         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
795         if (msfs == NULL)
796                 GOTO(out, rc = -EPROTO);
797
798         *osfs = *msfs;
799
800         EXIT;
801 out:
802         ptlrpc_req_finished(req);
803         return rc;
804 }
805
806 static int osp_import_event(struct obd_device *obd, struct obd_import *imp,
807                             enum obd_import_event event)
808 {
809         struct osp_device *d = lu2osp_dev(obd->obd_lu_dev);
810
811         switch (event) {
812         case IMP_EVENT_DISCON:
813                 d->opd_got_disconnected = 1;
814                 d->opd_imp_connected = 0;
815                 if (is_osp_on_ost(d->opd_obd->obd_name))
816                         break;
817                 osp_pre_update_status(d, -ENODEV);
818                 cfs_waitq_signal(&d->opd_pre_waitq);
819                 CDEBUG(D_HA, "got disconnected\n");
820                 break;
821         case IMP_EVENT_INACTIVE:
822                 d->opd_imp_active = 0;
823                 if (is_osp_on_ost(d->opd_obd->obd_name))
824                         break;
825                 osp_pre_update_status(d, -ENODEV);
826                 cfs_waitq_signal(&d->opd_pre_waitq);
827                 CDEBUG(D_HA, "got inactive\n");
828                 break;
829         case IMP_EVENT_ACTIVE:
830                 d->opd_imp_active = 1;
831                 if (d->opd_got_disconnected)
832                         d->opd_new_connection = 1;
833                 d->opd_imp_connected = 1;
834                 d->opd_imp_seen_connected = 1;
835                 if (is_osp_on_ost(d->opd_obd->obd_name))
836                         break;
837                 cfs_waitq_signal(&d->opd_pre_waitq);
838                 __osp_sync_check_for_work(d);
839                 CDEBUG(D_HA, "got connected\n");
840                 break;
841         case IMP_EVENT_INVALIDATE:
842                 if (obd->obd_namespace == NULL)
843                         break;
844                 ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY);
845                 break;
846         case IMP_EVENT_OCD:
847         case IMP_EVENT_DEACTIVATE:
848         case IMP_EVENT_ACTIVATE:
849                 break;
850         default:
851                 CERROR("%s: unsupported import event: %#x\n",
852                        obd->obd_name, event);
853         }
854         return 0;
855 }
856
857 static int osp_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
858                          void *karg, void *uarg)
859 {
860         struct obd_device       *obd = exp->exp_obd;
861         struct osp_device       *d;
862         struct obd_ioctl_data   *data = karg;
863         int                      rc = 0;
864
865         ENTRY;
866
867         LASSERT(obd->obd_lu_dev);
868         d = lu2osp_dev(obd->obd_lu_dev);
869         LASSERT(d->opd_dt_dev.dd_ops == &osp_dt_ops);
870
871         if (!cfs_try_module_get(THIS_MODULE)) {
872                 CERROR("%s: can't get module. Is it alive?", obd->obd_name);
873                 return -EINVAL;
874         }
875
876         switch (cmd) {
877         case OBD_IOC_CLIENT_RECOVER:
878                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
879                                            data->ioc_inlbuf1, 0);
880                 if (rc > 0)
881                         rc = 0;
882                 break;
883         case IOC_OSC_SET_ACTIVE:
884                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
885                                               data->ioc_offset);
886                 break;
887         case OBD_IOC_PING_TARGET:
888                 rc = ptlrpc_obd_ping(obd);
889                 break;
890         default:
891                 CERROR("%s: unrecognized ioctl %#x by %s\n", obd->obd_name,
892                        cmd, cfs_curproc_comm());
893                 rc = -ENOTTY;
894         }
895         cfs_module_put(THIS_MODULE);
896         return rc;
897 }
898
899 static int osp_obd_health_check(const struct lu_env *env,
900                                 struct obd_device *obd)
901 {
902         struct osp_device *d = lu2osp_dev(obd->obd_lu_dev);
903
904         ENTRY;
905
906         /*
907          * 1.8/2.0 behaviour is that OST being connected once at least
908          * is considired "healthy". and one "healty" OST is enough to
909          * allow lustre clients to connect to MDS
910          */
911         LASSERT(d);
912         RETURN(!d->opd_imp_seen_connected);
913 }
914
915 /* context key constructor/destructor: mdt_key_init, mdt_key_fini */
916 LU_KEY_INIT_FINI(osp, struct osp_thread_info);
917 static void osp_key_exit(const struct lu_context *ctx,
918                          struct lu_context_key *key, void *data)
919 {
920         struct osp_thread_info *info = data;
921
922         info->osi_attr.la_valid = 0;
923 }
924
925 struct lu_context_key osp_thread_key = {
926         .lct_tags = LCT_MD_THREAD,
927         .lct_init = osp_key_init,
928         .lct_fini = osp_key_fini,
929         .lct_exit = osp_key_exit
930 };
931
932 /* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
933 LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info);
934
935 struct lu_context_key osp_txn_key = {
936         .lct_tags = LCT_OSP_THREAD,
937         .lct_init = osp_txn_key_init,
938         .lct_fini = osp_txn_key_fini
939 };
940 LU_TYPE_INIT_FINI(osp, &osp_thread_key, &osp_txn_key);
941
942 static struct lu_device_type_operations osp_device_type_ops = {
943         .ldto_init           = osp_type_init,
944         .ldto_fini           = osp_type_fini,
945
946         .ldto_start          = osp_type_start,
947         .ldto_stop           = osp_type_stop,
948
949         .ldto_device_alloc   = osp_device_alloc,
950         .ldto_device_free    = osp_device_free,
951
952         .ldto_device_fini    = osp_device_fini
953 };
954
955 static struct lu_device_type osp_device_type = {
956         .ldt_tags     = LU_DEVICE_DT,
957         .ldt_name     = LUSTRE_OSP_NAME,
958         .ldt_ops      = &osp_device_type_ops,
959         .ldt_ctx_tags = LCT_MD_THREAD
960 };
961
962 static struct obd_ops osp_obd_device_ops = {
963         .o_owner        = THIS_MODULE,
964         .o_add_conn     = client_import_add_conn,
965         .o_del_conn     = client_import_del_conn,
966         .o_reconnect    = osp_reconnect,
967         .o_connect      = osp_obd_connect,
968         .o_disconnect   = osp_obd_disconnect,
969         .o_health_check = osp_obd_health_check,
970         .o_import_event = osp_import_event,
971         .o_iocontrol    = osp_iocontrol,
972         .o_statfs       = osp_obd_statfs,
973 };
974
975 struct llog_operations osp_mds_ost_orig_logops;
976
977 static int __init osp_mod_init(void)
978 {
979         struct lprocfs_static_vars       lvars;
980         cfs_proc_dir_entry_t            *osc_proc_dir;
981         int                              rc;
982
983         rc = lu_kmem_init(osp_caches);
984         if (rc)
985                 return rc;
986
987         lprocfs_osp_init_vars(&lvars);
988
989         rc = class_register_type(&osp_obd_device_ops, NULL, lvars.module_vars,
990                                  LUSTRE_OSP_NAME, &osp_device_type);
991
992         /* create "osc" entry in procfs for compatibility purposes */
993         if (rc != 0) {
994                 lu_kmem_fini(osp_caches);
995                 return rc;
996         }
997
998         /* Note: add_rec/delcare_add_rec will be only used by catalogs */
999         osp_mds_ost_orig_logops = llog_osd_ops;
1000         osp_mds_ost_orig_logops.lop_add = llog_cat_add_rec;
1001         osp_mds_ost_orig_logops.lop_declare_add = llog_cat_declare_add_rec;
1002
1003         osc_proc_dir = lprocfs_srch(proc_lustre_root, "osc");
1004         if (osc_proc_dir == NULL) {
1005                 osc_proc_dir = lprocfs_register("osc", proc_lustre_root, NULL,
1006                                                 NULL);
1007                 if (IS_ERR(osc_proc_dir))
1008                         CERROR("osp: can't create compat entry \"osc\": %d\n",
1009                                (int) PTR_ERR(osc_proc_dir));
1010         }
1011         return rc;
1012 }
1013
1014 static void __exit osp_mod_exit(void)
1015 {
1016         lprocfs_try_remove_proc_entry("osc", proc_lustre_root);
1017
1018         class_unregister_type(LUSTRE_OSP_NAME);
1019         lu_kmem_fini(osp_caches);
1020 }
1021
1022 MODULE_AUTHOR("Intel, Inc. <http://www.intel.com/>");
1023 MODULE_DESCRIPTION("Lustre OST Proxy Device ("LUSTRE_OSP_NAME")");
1024 MODULE_LICENSE("GPL");
1025
1026 cfs_module(osp, LUSTRE_VERSION_STRING, osp_mod_init, osp_mod_exit);