Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / osp / osp_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/osp/osp_dev.c
32  *
33  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34  * Author: Mikhail Pershin <mike.pershin@intel.com>
35  * Author: Di Wang <di.wang@intel.com>
36  */
37 /*
38  * The Object Storage Proxy (OSP) module provides an implementation of
39  * the DT API for remote MDTs and OSTs. Every local OSP device (or
40  * object) is a proxy for a remote OSD device (or object). Thus OSP
41  * converts DT operations into RPCs, which are sent to the OUT service
42  * on a remote target, converted back to DT operations, and
43  * executed. Of course there are many ways in which this description
44  * is inaccurate but it's a good enough mental model. OSP is used by
45  * the MDT stack in several ways:
46  *
47  * - OSP devices allocate FIDs for the stripe sub-objects of a striped
48  *   file or directory.
49  *
50  * - OSP objects represent the remote MDT and OST objects that are
51  *   the stripes of a striped object.
52  *
53  * - OSP devices log, send, and track synchronous operations (setattr
54  *   and unlink) to remote targets.
55  *
56  * - OSP objects are the bottom slice of the compound LU object
57  *   representing a remote MDT object: MDT/MDD/LOD/OSP.
58  *
59  * - OSP objects are used by LFSCK to represent remote OST objects
60  *   during the verification of MDT-OST consistency.
61  *
62  * - OSP devices batch idempotent requests (declare_attr_get() and
63  *   declare_xattr_get()) to the remote target and cache their results.
64  *
65  * In addition the OSP layer implements a subset of the OBD device API
66  * to support being a client of a remote target, connecting to other
67  * layers, and FID allocation.
68  */
69
70 #define DEBUG_SUBSYSTEM S_MDS
71
72 #include <linux/kthread.h>
73
74 #include <uapi/linux/lustre/lustre_ioctl.h>
75 #include <lustre_ioctl_old.h>
76 #include <lustre_log.h>
77 #include <lustre_obdo.h>
78 #include <uapi/linux/lustre/lustre_param.h>
79 #include <obd_class.h>
80
81 #include "osp_internal.h"
82
83 /* Slab for OSP object allocation */
84 struct kmem_cache *osp_object_kmem;
85
86 static struct lu_kmem_descr osp_caches[] = {
87         {
88                 .ckd_cache = &osp_object_kmem,
89                 .ckd_name  = "osp_obj",
90                 .ckd_size  = sizeof(struct osp_object)
91         },
92         {
93                 .ckd_cache = NULL
94         }
95 };
96
97 /**
98  * Implementation of lu_device_operations::ldo_object_alloc
99  *
100  * Allocates an OSP object in memory, whose FID is on the remote target.
101  *
102  * \param[in] env       execution environment
103  * \param[in] hdr       The header of the object stack. If it is NULL, it
104  *                      means the object is not built from top device, i.e.
105  *                      it is a sub-stripe object of striped directory or
106  *                      an OST object.
107  * \param[in] d         OSP device
108  *
109  * \retval object       object being created if the creation succeed.
110  * \retval NULL         NULL if the creation failed.
111  */
112 static struct lu_object *osp_object_alloc(const struct lu_env *env,
113                                           const struct lu_object_header *hdr,
114                                           struct lu_device *d)
115 {
116         struct osp_object *o;
117
118         OBD_SLAB_ALLOC_PTR_GFP(o, osp_object_kmem, GFP_NOFS);
119         if (o != NULL) {
120                 struct lu_object *l = &o->opo_obj.do_lu;
121
122                 /* If hdr is NULL, it means the object is not built
123                  * from the top dev(MDT/OST), usually it happens when
124                  * building striped object, like data object on MDT or
125                  * striped object for directory */
126                 if (hdr == NULL) {
127                         struct lu_object_header *h = &o->opo_header;
128
129                         lu_object_header_init(h);
130                         dt_object_init(&o->opo_obj, h, d);
131                         lu_object_add_top(h, l);
132                 } else {
133                         dt_object_init(&o->opo_obj, NULL, d);
134                 }
135
136                 l->lo_ops = &osp_lu_obj_ops;
137
138                 init_rwsem(&o->opo_sem);
139                 INIT_LIST_HEAD(&o->opo_xattr_list);
140                 INIT_LIST_HEAD(&o->opo_invalidate_cb_list);
141                 spin_lock_init(&o->opo_lock);
142                 init_rwsem(&o->opo_invalidate_sem);
143
144                 return l;
145         }
146
147         return NULL;
148 }
149
150 /**
151  * Find or create the local object
152  *
153  * Finds or creates the local file referenced by \a reg_id and return the
154  * attributes of the local file.
155  *
156  * \param[in] env       execution environment
157  * \param[in] osp       OSP device
158  * \param[out] attr     attributes of the object
159  * \param[in] reg_id    the local object ID of the file. It will be used
160  *                      to compose a local FID{FID_SEQ_LOCAL_FILE, reg_id, 0}
161  *                      to identify the object.
162  *
163  * \retval object               object(dt_object) found or created
164  * \retval ERR_PTR(errno)       ERR_PTR(errno) if not get the object.
165  */
166 static struct dt_object
167 *osp_find_or_create_local_file(const struct lu_env *env, struct osp_device *osp,
168                                struct lu_attr *attr, __u32 reg_id)
169 {
170         struct osp_thread_info *osi = osp_env_info(env);
171         struct dt_object_format dof = { 0 };
172         struct dt_object       *dto;
173         int                  rc;
174         ENTRY;
175
176         lu_local_obj_fid(&osi->osi_fid, reg_id);
177         attr->la_valid = LA_MODE;
178         attr->la_mode = S_IFREG | 0644;
179         dof.dof_type = DFT_REGULAR;
180         /* Find or create the local object by osi_fid. */
181         dto = dt_find_or_create(env, osp->opd_storage, &osi->osi_fid,
182                                 &dof, attr);
183         if (IS_ERR(dto))
184                 RETURN(dto);
185
186         /* Get attributes of the local object. */
187         rc = dt_attr_get(env, dto, attr);
188         if (rc) {
189                 CERROR("%s: can't be initialized: rc = %d\n",
190                        osp->opd_obd->obd_name, rc);
191                 dt_object_put(env, dto);
192                 RETURN(ERR_PTR(rc));
193         }
194         RETURN(dto);
195 }
196
197 /**
198  * Write data buffer to a local file object.
199  *
200  * \param[in] env       execution environment
201  * \param[in] osp       OSP device
202  * \param[in] dt_obj    object written to
203  * \param[in] buf       buffer containing byte array and length
204  * \param[in] offset    write offset in the object in bytes
205  *
206  * \retval 0            0 if write succeed
207  * \retval -EFAULT      -EFAULT if only part of buffer is written.
208  * \retval negative             other negative errno if write failed.
209  */
210 static int osp_write_local_file(const struct lu_env *env,
211                                 struct osp_device *osp,
212                                 struct dt_object *dt_obj,
213                                 struct lu_buf *buf,
214                                 loff_t offset)
215 {
216         struct thandle *th;
217         int rc;
218
219         if (osp->opd_storage->dd_rdonly)
220                 RETURN(0);
221
222         th = dt_trans_create(env, osp->opd_storage);
223         if (IS_ERR(th))
224                 RETURN(PTR_ERR(th));
225
226         rc = dt_declare_record_write(env, dt_obj, buf, offset, th);
227         if (rc)
228                 GOTO(out, rc);
229         rc = dt_trans_start_local(env, osp->opd_storage, th);
230         if (rc)
231                 GOTO(out, rc);
232
233         rc = dt_record_write(env, dt_obj, buf, &offset, th);
234 out:
235         dt_trans_stop(env, osp->opd_storage, th);
236         RETURN(rc);
237 }
238
239 /**
240  * Initialize last ID object.
241  *
242  * This function initializes the LAST_ID file, which stores the current last
243  * used id of data objects. The MDT will use the last used id and the last_seq
244  * (\see osp_init_last_seq()) to synchronize the precreate object cache with
245  * OSTs.
246  *
247  * \param[in] env       execution environment
248  * \param[in] osp       OSP device
249  *
250  * \retval 0            0 if initialization succeed
251  * \retval negative     negative errno if initialization failed
252  */
253 static int osp_init_last_objid(const struct lu_env *env, struct osp_device *osp)
254 {
255         struct osp_thread_info  *osi = osp_env_info(env);
256         struct lu_fid           *fid = &osp->opd_last_used_fid;
257         struct dt_object        *dto;
258         int                     rc = -EFAULT;
259         ENTRY;
260
261         dto = osp_find_or_create_local_file(env, osp, &osi->osi_attr,
262                                             MDD_LOV_OBJ_OID);
263         if (IS_ERR(dto))
264                 RETURN(PTR_ERR(dto));
265
266         osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &osp->opd_last_id,
267                            osp->opd_index);
268
269         /* object will be released in device cleanup path */
270         if (osi->osi_attr.la_size >= (osi->osi_off + osi->osi_lb.lb_len)) {
271                 rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
272                 if (rc != 0 && rc != -EFAULT)
273                         GOTO(out, rc);
274                 /* In case of idif bits 32-48 go to f_seq
275                  * (see osp_init_last_seq). So don't care
276                  * about u64->u32 convertion. */
277                 fid->f_oid = osp->opd_last_id;
278         }
279
280         if (rc == -EFAULT) { /* fresh LAST_ID */
281                 osp->opd_last_id = 0;
282                 fid->f_oid = 0;
283                 rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
284                                           osi->osi_off);
285                 if (rc != 0)
286                         GOTO(out, rc);
287         }
288         osp->opd_last_used_oid_file = dto;
289         RETURN(0);
290 out:
291         /* object will be released in device cleanup path */
292         CERROR("%s: can't initialize lov_objid: rc = %d\n",
293                osp->opd_obd->obd_name, rc);
294         dt_object_put(env, dto);
295         osp->opd_last_used_oid_file = NULL;
296         RETURN(rc);
297 }
298
299 /**
300  * Initialize last sequence object.
301  *
302  * This function initializes the LAST_SEQ file in the local OSD, which stores
303  * the current last used sequence of data objects. The MDT will use the last
304  * sequence and last id (\see osp_init_last_objid()) to synchronize the
305  * precreate object cache with OSTs.
306  *
307  * \param[in] env       execution environment
308  * \param[in] osp       OSP device
309  *
310  * \retval 0            0 if initialization succeed
311  * \retval negative     negative errno if initialization failed
312  */
313 static int osp_init_last_seq(const struct lu_env *env, struct osp_device *osp)
314 {
315         struct osp_thread_info  *osi = osp_env_info(env);
316         struct lu_fid           *fid = &osp->opd_last_used_fid;
317         struct dt_object        *dto;
318         int                     rc = -EFAULT;
319         ENTRY;
320
321         dto = osp_find_or_create_local_file(env, osp, &osi->osi_attr,
322                                             MDD_LOV_OBJ_OSEQ);
323         if (IS_ERR(dto))
324                 RETURN(PTR_ERR(dto));
325
326         osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq,
327                            osp->opd_index);
328
329         /* object will be released in device cleanup path */
330         if (osi->osi_attr.la_size >= (osi->osi_off + osi->osi_lb.lb_len)) {
331                 rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
332                 if (rc != 0 && rc != -EFAULT)
333                         GOTO(out, rc);
334                 if (fid_is_idif(fid))
335                         fid->f_seq = fid_idif_seq(osp->opd_last_id,
336                                                   osp->opd_index);
337         }
338
339         if (rc == -EFAULT) { /* fresh OSP */
340                 fid->f_seq = 0;
341                 rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
342                                           osi->osi_off);
343                 if (rc != 0)
344                         GOTO(out, rc);
345         }
346         osp->opd_last_used_seq_file = dto;
347         RETURN(0);
348 out:
349         /* object will be released in device cleanup path */
350         CERROR("%s: can't initialize lov_seq: rc = %d\n",
351                osp->opd_obd->obd_name, rc);
352         dt_object_put(env, dto);
353         osp->opd_last_used_seq_file = NULL;
354         RETURN(rc);
355 }
356
357 /**
358  * Initialize last OID and sequence object.
359  *
360  * If the MDT is just upgraded to 2.4 from the lower version, where the
361  * LAST_SEQ file does not exist, the file will be created and IDIF sequence
362  * will be written into the file.
363  *
364  * \param[in] env       execution environment
365  * \param[in] osp       OSP device
366  *
367  * \retval 0            0 if initialization succeed
368  * \retval negative     negative error if initialization failed
369  */
370 static int osp_last_used_init(const struct lu_env *env, struct osp_device *osp)
371 {
372         struct osp_thread_info *osi = osp_env_info(env);
373         int                  rc;
374         ENTRY;
375
376         fid_zero(&osp->opd_last_used_fid);
377         rc = osp_init_last_objid(env, osp);
378         if (rc < 0) {
379                 CERROR("%s: Can not get ids %d from old objid!\n",
380                        osp->opd_obd->obd_name, rc);
381                 RETURN(rc);
382         }
383
384         rc = osp_init_last_seq(env, osp);
385         if (rc < 0) {
386                 CERROR("%s: Can not get sequence %d from old objseq!\n",
387                        osp->opd_obd->obd_name, rc);
388                 GOTO(out, rc);
389         }
390
391         if (fid_oid(&osp->opd_last_used_fid) != 0 &&
392             fid_seq(&osp->opd_last_used_fid) == 0) {
393                 /* Just upgrade from the old version,
394                  * set the seq to be IDIF */
395                 osp->opd_last_used_fid.f_seq =
396                    fid_idif_seq(fid_oid(&osp->opd_last_used_fid),
397                                 osp->opd_index);
398                 osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off,
399                                     &osp->opd_last_used_fid.f_seq,
400                                     osp->opd_index);
401                 rc = osp_write_local_file(env, osp, osp->opd_last_used_seq_file,
402                                           &osi->osi_lb, osi->osi_off);
403                 if (rc) {
404                         CERROR("%s : Can not write seq file: rc = %d\n",
405                                osp->opd_obd->obd_name, rc);
406                         GOTO(out, rc);
407                 }
408         }
409
410         if (!fid_is_zero(&osp->opd_last_used_fid) &&
411                  !fid_is_sane(&osp->opd_last_used_fid)) {
412                 CERROR("%s: Got invalid FID "DFID"\n", osp->opd_obd->obd_name,
413                         PFID(&osp->opd_last_used_fid));
414                 GOTO(out, rc = -EINVAL);
415         }
416
417         osp_fid_to_obdid(&osp->opd_last_used_fid, &osp->opd_last_id);
418         CDEBUG(D_INFO, "%s: Init last used fid "DFID"\n",
419                osp->opd_obd->obd_name, PFID(&osp->opd_last_used_fid));
420 out:
421         if (rc != 0) {
422                 if (osp->opd_last_used_oid_file != NULL) {
423                         dt_object_put(env, osp->opd_last_used_oid_file);
424                         osp->opd_last_used_oid_file = NULL;
425                 }
426                 if (osp->opd_last_used_seq_file != NULL) {
427                         dt_object_put(env, osp->opd_last_used_seq_file);
428                         osp->opd_last_used_seq_file = NULL;
429                 }
430         }
431
432         RETURN(rc);
433 }
434
435 /**
436  * Release the last sequence and OID file objects in OSP device.
437  *
438  * \param[in] env       execution environment
439  * \param[in] osp       OSP device
440  */
441 static void osp_last_used_fini(const struct lu_env *env, struct osp_device *osp)
442 {
443         /* release last_used file */
444         if (osp->opd_last_used_oid_file != NULL) {
445                 dt_object_put(env, osp->opd_last_used_oid_file);
446                 osp->opd_last_used_oid_file = NULL;
447         }
448
449         if (osp->opd_last_used_seq_file != NULL) {
450                 dt_object_put(env, osp->opd_last_used_seq_file);
451                 osp->opd_last_used_seq_file = NULL;
452         }
453 }
454
455 /**
456  * Disconnects the connection between OSP and its correspondent MDT or OST, and
457  * the import will be marked as inactive. It will only be called during OSP
458  * cleanup process.
459  *
460  * \param[in] d         OSP device being disconnected
461  *
462  * \retval 0            0 if disconnection succeed
463  * \retval negative     negative errno if disconnection failed
464  */
465 static int osp_disconnect(struct osp_device *d)
466 {
467         struct obd_device *obd = d->opd_obd;
468         struct obd_import *imp;
469         int rc = 0;
470
471         imp = obd->u.cli.cl_import;
472
473         /* Mark import deactivated now, so we don't try to reconnect if any
474          * of the cleanup RPCs fails (e.g. ldlm cancel, etc).  We don't
475          * fully deactivate the import, or that would drop all requests. */
476         LASSERT(imp != NULL);
477         spin_lock(&imp->imp_lock);
478         imp->imp_deactive = 1;
479         spin_unlock(&imp->imp_lock);
480
481         ptlrpc_deactivate_import(imp);
482
483         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
484          * delete it regardless.  (It's safe to delete an import that was
485          * never added.) */
486         (void)ptlrpc_pinger_del_import(imp);
487
488         /* Send disconnect on healthy import, do force disconnect otherwise */
489         spin_lock(&imp->imp_lock);
490         imp->imp_obd->obd_force = imp->imp_state != LUSTRE_IMP_FULL;
491         spin_unlock(&imp->imp_lock);
492
493         rc = ptlrpc_disconnect_import(imp, 0);
494         if (rc != 0)
495                 CERROR("%s: can't disconnect: rc = %d\n", obd->obd_name, rc);
496
497         ptlrpc_invalidate_import(imp);
498
499         RETURN(rc);
500 }
501
502 /**
503  * Initialize the osp_update structure in OSP device
504  *
505  * Allocate osp update structure and start update thread.
506  *
507  * \param[in] osp       OSP device
508  *
509  * \retval              0 if initialization succeeds.
510  * \retval              negative errno if initialization fails.
511  */
512 static int osp_update_init(struct osp_device *osp)
513 {
514         struct task_struct *task;
515         int rc;
516
517         ENTRY;
518
519         LASSERT(osp->opd_connect_mdt);
520
521         if (osp->opd_storage->dd_rdonly)
522                 RETURN(0);
523
524         OBD_ALLOC_PTR(osp->opd_update);
525         if (osp->opd_update == NULL)
526                 RETURN(-ENOMEM);
527
528         init_waitqueue_head(&osp->opd_update->ou_waitq);
529         spin_lock_init(&osp->opd_update->ou_lock);
530         INIT_LIST_HEAD(&osp->opd_update->ou_list);
531         osp->opd_update->ou_rpc_version = 1;
532         osp->opd_update->ou_version = 1;
533         osp->opd_update->ou_generation = 0;
534
535         rc = lu_env_init(&osp->opd_update->ou_env,
536                          osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
537         if (rc < 0) {
538                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
539                        rc);
540                 OBD_FREE_PTR(osp->opd_update);
541                 osp->opd_update = NULL;
542                 RETURN(rc);
543         }
544         /* start thread handling sending updates to the remote MDT */
545         task = kthread_create(osp_send_update_thread, osp,
546                               "osp_up%u-%u", osp->opd_index, osp->opd_group);
547         if (IS_ERR(task)) {
548                 int rc = PTR_ERR(task);
549
550                 lu_env_fini(&osp->opd_update->ou_env);
551                 OBD_FREE_PTR(osp->opd_update);
552                 osp->opd_update = NULL;
553                 CERROR("%s: can't start precreate thread: rc = %d\n",
554                        osp->opd_obd->obd_name, rc);
555                 RETURN(rc);
556         }
557
558         osp->opd_update->ou_update_task = task;
559         wake_up_process(task);
560
561         RETURN(0);
562 }
563
564 /**
565  * Finialize osp_update structure in OSP device
566  *
567  * Stop the OSP update sending thread, then delete the left
568  * osp thandle in the sending list.
569  *
570  * \param [in] osp      OSP device.
571  */
572 static void osp_update_fini(const struct lu_env *env, struct osp_device *osp)
573 {
574         struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
575         struct osp_update_request *our;
576         struct osp_update_request *tmp;
577         struct osp_updates *ou = osp->opd_update;
578
579         if (ou == NULL)
580                 return;
581
582         kthread_stop(ou->ou_update_task);
583         lu_env_fini(&ou->ou_env);
584
585         /*
586          * import invalidation can be running in a dedicated thread.
587          * we have to wait for the thread's completion as the thread
588          * invalidates this list as well.
589          */
590         wait_event_idle(imp->imp_recovery_waitq,
591                         (atomic_read(&imp->imp_inval_count) == 0));
592
593         /* Remove the left osp thandle from the list */
594         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
595                 list_del_init(&our->our_list);
596                 LASSERT(our->our_th != NULL);
597                 osp_trans_callback(env, our->our_th, -EIO);
598                 /* our will be destroyed in osp_thandle_put() */
599                 osp_thandle_put(env, our->our_th);
600         }
601
602         OBD_FREE_PTR(ou);
603         osp->opd_update = NULL;
604 }
605
606 /**
607  * Cleanup OSP, which includes disconnect import, cleanup unlink log, stop
608  * precreate threads etc.
609  *
610  * \param[in] env       execution environment.
611  * \param[in] d         OSP device being disconnected.
612  *
613  * \retval 0            0 if cleanup succeed
614  * \retval negative     negative errno if cleanup failed
615  */
616 static int osp_shutdown(const struct lu_env *env, struct osp_device *d)
617 {
618         int                      rc = 0;
619         ENTRY;
620
621         LASSERT(env);
622
623         rc = osp_disconnect(d);
624
625         osp_statfs_fini(d);
626
627         if (!d->opd_connect_mdt) {
628                 /* stop sync thread */
629                 osp_sync_fini(d);
630
631                 /* stop precreate thread */
632                 osp_precreate_fini(d);
633
634                 /* release last_used file */
635                 osp_last_used_fini(env, d);
636         }
637
638         obd_fid_fini(d->opd_obd);
639
640         RETURN(rc);
641 }
642
643 /**
644  * Implementation of osp_lu_ops::ldo_process_config
645  *
646  * This function processes config log records in OSP layer. It is usually
647  * called from the top layer of MDT stack, and goes through the stack by calling
648  * ldo_process_config of next layer.
649  *
650  * \param[in] env       execution environment
651  * \param[in] dev       lu_device of OSP
652  * \param[in] lcfg      config log
653  *
654  * \retval 0            0 if the config log record is executed correctly.
655  * \retval negative     negative errno if the record execution fails.
656  */
657 static int osp_process_config(const struct lu_env *env,
658                               struct lu_device *dev, struct lustre_cfg *lcfg)
659 {
660         struct osp_device *d = lu2osp_dev(dev);
661         struct dt_device *dt = lu2dt_dev(dev);
662         struct obd_device *obd = d->opd_obd;
663         ssize_t count;
664         int rc;
665
666         ENTRY;
667
668         switch (lcfg->lcfg_command) {
669         case LCFG_PRE_CLEANUP:
670                 rc = osp_disconnect(d);
671                 osp_update_fini(env, d);
672                 break;
673         case LCFG_CLEANUP:
674                 /*
675                  * cleanup ldlm so that PRE_CLEANUP phase doesn't block
676                  * awaiting for locks held by MDT threads awaiting for
677                  * all OSPs to interrupt their in-flight RPCs
678                  */
679                 if (obd->obd_namespace != NULL)
680                         ldlm_namespace_free_prior(obd->obd_namespace, NULL, 1);
681                 lu_dev_del_linkage(dev->ld_site, dev);
682                 rc = osp_shutdown(env, d);
683                 break;
684         case LCFG_PARAM:
685                 count = class_modify_config(lcfg, d->opd_connect_mdt ?
686                                                   PARAM_OSP : PARAM_OSC,
687                                             &dt->dd_kobj);
688                 if (count < 0) {
689                         /* class_modify_config() haven't found matching
690                          * parameter and returned an error so that layer(s)
691                          * below could use that. But OSP is the bottom, so
692                          * just ignore it
693                          */
694                         CERROR("%s: unknown param %s\n",
695                                (char *)lustre_cfg_string(lcfg, 0),
696                                (char *)lustre_cfg_string(lcfg, 1));
697                 }
698                 rc = 0;
699                 break;
700         default:
701                 CERROR("%s: unknown command %u\n",
702                        (char *)lustre_cfg_string(lcfg, 0), lcfg->lcfg_command);
703                 rc = 0;
704                 break;
705         }
706
707         RETURN(rc);
708 }
709
710 /**
711  * Implementation of osp_lu_ops::ldo_recovery_complete
712  *
713  * This function is called after recovery is finished, and OSP layer
714  * will wake up precreate thread here.
715  *
716  * \param[in] env       execution environment
717  * \param[in] dev       lu_device of OSP
718  *
719  * \retval 0            0 unconditionally
720  */
721 static int osp_recovery_complete(const struct lu_env *env,
722                                  struct lu_device *dev)
723 {
724         struct osp_device       *osp = lu2osp_dev(dev);
725
726         ENTRY;
727         osp->opd_recovery_completed = 1;
728
729         if (!osp->opd_connect_mdt && osp->opd_pre != NULL)
730                 wake_up(&osp->opd_pre_waitq);
731
732         RETURN(0);
733 }
734
735 /**
736  * Implementation of lu_device_operations::ldo_fid_alloc() for OSP
737  *
738  * Allocate FID from remote MDT.
739  *
740  * see include/lu_object.h for the details.
741  */
742 static int osp_fid_alloc(const struct lu_env *env, struct lu_device *d,
743                          struct lu_fid *fid, struct lu_object *parent,
744                          const struct lu_name *name)
745 {
746         struct osp_device *osp = lu2osp_dev(d);
747         struct client_obd *cli = &osp->opd_obd->u.cli;
748         struct lu_client_seq *seq = cli->cl_seq;
749         int rc;
750
751         ENTRY;
752
753         /* Sigh, fid client is not ready yet */
754         if (!osp->opd_obd->u.cli.cl_seq)
755                 RETURN(-ENOTCONN);
756
757         if (!osp->opd_obd->u.cli.cl_seq->lcs_exp)
758                 RETURN(-ENOTCONN);
759
760         rc = seq_client_alloc_fid(env, seq, fid);
761
762         RETURN(rc);
763 }
764
765 const struct lu_device_operations osp_lu_ops = {
766         .ldo_object_alloc       = osp_object_alloc,
767         .ldo_process_config     = osp_process_config,
768         .ldo_recovery_complete  = osp_recovery_complete,
769         .ldo_fid_alloc          = osp_fid_alloc,
770 };
771
772 /**
773  * Implementation of dt_device_operations::dt_statfs
774  *
775  * This function provides statfs status (for precreation) from
776  * corresponding OST. Note: this function only retrieves the status
777  * from the OSP device, and the real statfs RPC happens inside
778  * precreate thread (\see osp_statfs_update). Note: OSP for MDT does
779  * not need to retrieve statfs data for now.
780  *
781  * \param[in] env       execution environment.
782  * \param[in] dev       dt_device of OSP.
783  * \param[out] sfs      holds the retrieved statfs data.
784  *
785  * \retval 0            0 statfs data was retrieved successfully or
786  *                      retrieval was not needed
787  * \retval negative     negative errno if get statfs failed.
788  */
789 static int osp_statfs(const struct lu_env *env, struct dt_device *dev,
790                       struct obd_statfs *sfs, struct obd_statfs_info *info)
791 {
792         struct osp_device *d = dt2osp_dev(dev);
793         struct obd_import *imp = d->opd_obd->u.cli.cl_import;
794
795         ENTRY;
796
797         if (imp->imp_state == LUSTRE_IMP_CLOSED)
798                 RETURN(-ESHUTDOWN);
799
800         if (unlikely(d->opd_imp_active == 0))
801                 RETURN(-ENOTCONN);
802
803         /* return recently updated data */
804         *sfs = d->opd_statfs;
805         if (info) {
806                 info->os_reserved_mb_low = d->opd_reserved_mb_low;
807                 info->os_reserved_mb_high = d->opd_reserved_mb_high;
808         }
809
810         CDEBUG(D_OTHER,
811                "%s: blocks=%llu, bfree=%llu, bavail=%llu, bsize=%u, reserved_mb_low=%u, reserved_mb_high=%u, files=%llu, ffree=%llu, state=%x\n",
812                d->opd_obd->obd_name,
813                sfs->os_blocks, sfs->os_bfree, sfs->os_bavail, sfs->os_bsize,
814                d->opd_reserved_mb_low, d->opd_reserved_mb_high,
815                sfs->os_files, sfs->os_ffree, sfs->os_state);
816
817         if (d->opd_pre == NULL || (info && !info->os_enable_pre))
818                 RETURN(0);
819
820         /*
821          * The layer above osp (usually lod) can use f_precreated to
822          * estimate how many objects are available for immediate usage.
823          */
824         spin_lock(&d->opd_pre_lock);
825         sfs->os_fprecreated = osp_fid_diff(&d->opd_pre_last_created_fid,
826                                            &d->opd_pre_used_fid);
827         sfs->os_fprecreated -= d->opd_pre_reserved;
828         LASSERTF(sfs->os_fprecreated <= OST_MAX_PRECREATE * 2,
829                  "last_created "DFID", next_fid "DFID", reserved %llu\n",
830                  PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_pre_used_fid),
831                  d->opd_pre_reserved);
832         spin_unlock(&d->opd_pre_lock);
833         RETURN(0);
834 }
835
836 /**
837  * Implementation of dt_device_operations::dt_sync
838  *
839  * This function synchronizes the OSP cache to the remote target. It wakes
840  * up unlink log threads and sends out unlink records to the remote OST.
841  *
842  * \param[in] env       execution environment
843  * \param[in] dev       dt_device of OSP
844  *
845  * \retval 0            0 if synchronization succeeds
846  * \retval negative     negative errno if synchronization fails
847  */
848 static int osp_sync(const struct lu_env *env, struct dt_device *dev)
849 {
850         struct osp_device *d = dt2osp_dev(dev);
851         time64_t start = ktime_get_seconds();
852         int recs, rc = 0;
853         u64 old;
854
855         ENTRY;
856
857         /* No Sync between MDTs yet. */
858         if (d->opd_connect_mdt)
859                 RETURN(0);
860
861         if (d->opd_storage->dd_rdonly)
862                 RETURN(0);
863
864         recs = atomic_read(&d->opd_sync_changes);
865         old = atomic64_read(&d->opd_sync_processed_recs);
866
867         osp_sync_force(env, dt2osp_dev(dev));
868
869         if (unlikely(d->opd_imp_active == 0))
870                 RETURN(-ENOTCONN);
871
872         down_write(&d->opd_async_updates_rwsem);
873
874         CDEBUG(D_OTHER, "%s: async updates %d\n", d->opd_obd->obd_name,
875                atomic_read(&d->opd_async_updates_count));
876
877         /* make sure the connection is fine */
878         rc = wait_event_idle_timeout(
879                 d->opd_sync_barrier_waitq,
880                 atomic_read(&d->opd_async_updates_count) == 0,
881                 cfs_time_seconds(obd_timeout));
882         if (rc > 0)
883                 rc = 0;
884         else if (rc == 0)
885                 rc = -ETIMEDOUT;
886
887         up_write(&d->opd_async_updates_rwsem);
888         if (rc != 0)
889                 GOTO(out, rc);
890
891         CDEBUG(D_CACHE, "%s: processed %llu\n", d->opd_obd->obd_name,
892                (unsigned long long)atomic64_read(&d->opd_sync_processed_recs));
893
894         while (atomic64_read(&d->opd_sync_processed_recs) < old + recs) {
895                 __u64 last = atomic64_read(&d->opd_sync_processed_recs);
896                 /* make sure the connection is fine */
897                 wait_event_idle_timeout(
898                         d->opd_sync_barrier_waitq,
899                         atomic64_read(&d->opd_sync_processed_recs)
900                              >= old + recs,
901                         cfs_time_seconds(obd_timeout));
902
903                 if (atomic64_read(&d->opd_sync_processed_recs) >= old + recs)
904                         break;
905
906                 if (atomic64_read(&d->opd_sync_processed_recs) != last) {
907                         /* some progress have been made,
908                          * keep trying... */
909                         continue;
910                 }
911
912                 /* no changes and expired, something is wrong */
913                 GOTO(out, rc = -ETIMEDOUT);
914         }
915
916         /* block new processing (barrier>0 - few callers are possible */
917         atomic_inc(&d->opd_sync_barrier);
918
919         CDEBUG(D_CACHE, "%s: %u in flight\n", d->opd_obd->obd_name,
920                atomic_read(&d->opd_sync_rpcs_in_flight));
921
922         /* wait till all-in-flight are replied, so executed by the target */
923         /* XXX: this is used by LFSCK at the moment, which doesn't require
924          *      all the changes to be committed, but in general it'd be
925          *      better to wait till commit */
926         while (atomic_read(&d->opd_sync_rpcs_in_flight) > 0) {
927                 old = atomic_read(&d->opd_sync_rpcs_in_flight);
928
929                 wait_event_idle_timeout(
930                         d->opd_sync_barrier_waitq,
931                         atomic_read(&d->opd_sync_rpcs_in_flight) == 0,
932                         cfs_time_seconds(obd_timeout));
933
934                 if (atomic_read(&d->opd_sync_rpcs_in_flight) == 0)
935                         break;
936
937                 if (atomic_read(&d->opd_sync_rpcs_in_flight) != old) {
938                         /* some progress have been made */
939                         continue;
940                 }
941
942                 /* no changes and expired, something is wrong */
943                 GOTO(out, rc = -ETIMEDOUT);
944         }
945
946 out:
947         /* resume normal processing (barrier=0) */
948         atomic_dec(&d->opd_sync_barrier);
949         osp_sync_check_for_work(d);
950
951         CDEBUG(D_CACHE, "%s: done in %lld: rc = %d\n", d->opd_obd->obd_name,
952                ktime_get_seconds() - start, rc);
953
954         RETURN(rc);
955 }
956
957 static const struct dt_device_operations osp_dt_ops = {
958         .dt_statfs       = osp_statfs,
959         .dt_sync         = osp_sync,
960         .dt_trans_create = osp_trans_create,
961         .dt_trans_start  = osp_trans_start,
962         .dt_trans_stop   = osp_trans_stop,
963         .dt_trans_cb_add   = osp_trans_cb_add,
964 };
965
966 /**
967  * Connect OSP to local OSD.
968  *
969  * Locate the local OSD referenced by \a nextdev and connect to it. Sometimes,
970  * OSP needs to access the local OSD to store some information. For example,
971  * during precreate, it needs to update last used OID and sequence file
972  * (LAST_SEQ) in local OSD.
973  *
974  * \param[in] env       execution environment
975  * \param[in] osp       OSP device
976  * \param[in] nextdev   the name of local OSD
977  *
978  * \retval 0            0 connection succeeded
979  * \retval negative     negative errno connection failed
980  */
981 static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *osp,
982                               const char *nextdev)
983 {
984         struct obd_connect_data *data = NULL;
985         struct obd_device       *obd;
986         int                      rc;
987
988         ENTRY;
989
990         LASSERT(osp->opd_storage_exp == NULL);
991
992         OBD_ALLOC_PTR(data);
993         if (data == NULL)
994                 RETURN(-ENOMEM);
995
996         obd = class_name2obd(nextdev);
997         if (obd == NULL) {
998                 CERROR("%s: can't locate next device: %s\n",
999                        osp->opd_obd->obd_name, nextdev);
1000                 GOTO(out, rc = -ENOTCONN);
1001         }
1002
1003         rc = obd_connect(env, &osp->opd_storage_exp, obd, &obd->obd_uuid, data,
1004                          NULL);
1005         if (rc) {
1006                 CERROR("%s: cannot connect to next dev %s: rc = %d\n",
1007                        osp->opd_obd->obd_name, nextdev, rc);
1008                 GOTO(out, rc);
1009         }
1010
1011         osp->opd_dt_dev.dd_lu_dev.ld_site =
1012                 osp->opd_storage_exp->exp_obd->obd_lu_dev->ld_site;
1013         LASSERT(osp->opd_dt_dev.dd_lu_dev.ld_site);
1014         osp->opd_storage = lu2dt_dev(osp->opd_storage_exp->exp_obd->obd_lu_dev);
1015
1016 out:
1017         OBD_FREE_PTR(data);
1018         RETURN(rc);
1019 }
1020
1021 /**
1022  * Determine if the lock needs to be cancelled
1023  *
1024  * Determine if the unused lock should be cancelled before replay, see
1025  * (ldlm_cancel_no_wait_policy()). Currently, only inode bits lock exists
1026  * between MDTs.
1027  *
1028  * \param[in] lock      lock to be checked.
1029  *
1030  * \retval              1 if the lock needs to be cancelled before replay.
1031  * \retval              0 if the lock does not need to be cancelled before
1032  *                      replay.
1033  */
1034 static int osp_cancel_weight(struct ldlm_lock *lock)
1035 {
1036         if (lock->l_resource->lr_type != LDLM_IBITS)
1037                 RETURN(0);
1038
1039         RETURN(1);
1040 }
1041
1042 /**
1043  * Initialize OSP device according to the parameters in the configuration
1044  * log \a cfg.
1045  *
1046  * Reconstruct the local device name from the configuration profile, and
1047  * initialize necessary threads and structures according to the OSP type
1048  * (MDT or OST).
1049  *
1050  * Since there is no record in the MDT configuration for the local disk
1051  * device, we have to extract this from elsewhere in the profile.
1052  * The only information we get at setup is from the OSC records:
1053  * setup 0:{fsname}-OSTxxxx-osc[-MDTxxxx] 1:lustre-OST0000_UUID 2:NID
1054  *
1055  * Note: configs generated by Lustre 1.8 are missing the -MDTxxxx part,
1056  * so, we need to reconstruct the name of the underlying OSD from this:
1057  * {fsname}-{svname}-osd, for example "lustre-MDT0000-osd".
1058  *
1059  * \param[in] env       execution environment
1060  * \param[in] osp       OSP device
1061  * \param[in] ldt       lu device type of OSP
1062  * \param[in] cfg       configuration log
1063  *
1064  * \retval 0            0 if OSP initialization succeeded.
1065  * \retval negative     negative errno if OSP initialization failed.
1066  */
1067 static int osp_init0(const struct lu_env *env, struct osp_device *osp,
1068                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
1069 {
1070         struct obd_device       *obd;
1071         struct obd_import       *imp;
1072         char *src, *tgt, *osdname = NULL;
1073         const char *mdt;
1074         int                     rc;
1075         u32 idx;
1076
1077         ENTRY;
1078
1079         mutex_init(&osp->opd_async_requests_mutex);
1080         INIT_LIST_HEAD(&osp->opd_async_updates);
1081         init_rwsem(&osp->opd_async_updates_rwsem);
1082         atomic_set(&osp->opd_async_updates_count, 0);
1083
1084         obd = class_name2obd(lustre_cfg_string(cfg, 0));
1085         if (obd == NULL) {
1086                 CERROR("Cannot find obd with name %s\n",
1087                        lustre_cfg_string(cfg, 0));
1088                 RETURN(-ENODEV);
1089         }
1090         osp->opd_obd = obd;
1091
1092         src = lustre_cfg_string(cfg, 0);
1093         if (src == NULL)
1094                 RETURN(-EINVAL);
1095
1096         tgt = strrchr(src, '-');
1097         if (tgt == NULL) {
1098                 CERROR("%s: invalid target name %s: rc = %d\n",
1099                        osp->opd_obd->obd_name, lustre_cfg_string(cfg, 0),
1100                        -EINVAL);
1101                 RETURN(-EINVAL);
1102         }
1103
1104         if (strncmp(tgt, "-osc", 4) == 0) {
1105                 /* Old OSC name fsname-OSTXXXX-osc */
1106                 for (tgt--; tgt > src && *tgt != '-'; tgt--)
1107                         ;
1108                 if (tgt == src) {
1109                         CERROR("%s: invalid target name %s: rc = %d\n",
1110                                osp->opd_obd->obd_name,
1111                                lustre_cfg_string(cfg, 0), -EINVAL);
1112                         RETURN(-EINVAL);
1113                 }
1114
1115                 if (strncmp(tgt, "-OST", 4) != 0) {
1116                         CERROR("%s: invalid target name %s: rc = %d\n",
1117                                osp->opd_obd->obd_name,
1118                                lustre_cfg_string(cfg, 0), -EINVAL);
1119                         RETURN(-EINVAL);
1120                 }
1121
1122                 rc = target_name2index(tgt + 1, &idx, &mdt);
1123                 if (rc < 0 || rc & LDD_F_SV_ALL || mdt[0] != '-') {
1124                         CERROR("%s: invalid OST index in '%s': rc = %d\n",
1125                                osp->opd_obd->obd_name, src, -EINVAL);
1126                         RETURN(-EINVAL);
1127                 }
1128                 osp->opd_index = idx;
1129                 osp->opd_group = 0;
1130                 idx = tgt - src;
1131         } else {
1132                 /* New OSC name fsname-OSTXXXX-osc-MDTXXXX */
1133                 if (strncmp(tgt, "-MDT", 4) != 0 &&
1134                     strncmp(tgt, "-OST", 4) != 0) {
1135                         CERROR("%s: invalid target name %s: rc = %d\n",
1136                                osp->opd_obd->obd_name,
1137                                lustre_cfg_string(cfg, 0), -EINVAL);
1138                         RETURN(-EINVAL);
1139                 }
1140
1141                 rc = target_name2index(tgt + 1, &idx, &mdt);
1142                 if (rc < 0 || rc & LDD_F_SV_ALL || *mdt != '\0') {
1143                         CERROR("%s: invalid OST index in '%s': rc = %d\n",
1144                                osp->opd_obd->obd_name, src, -EINVAL);
1145                         RETURN(-EINVAL);
1146                 }
1147
1148                 /* Get MDT index from the name and set it to opd_group,
1149                  * which will be used by OSP to connect with OST */
1150                 osp->opd_group = idx;
1151                 if (tgt - src <= 12) {
1152                         CERROR("%s: invalid mdt index from %s: rc =%d\n",
1153                                osp->opd_obd->obd_name,
1154                                lustre_cfg_string(cfg, 0), -EINVAL);
1155                         RETURN(-EINVAL);
1156                 }
1157
1158                 if (strncmp(tgt - 12, "-MDT", 4) == 0)
1159                         osp->opd_connect_mdt = 1;
1160
1161                 rc = target_name2index(tgt - 11, &idx, &mdt);
1162                 if (rc < 0 || rc & LDD_F_SV_ALL || mdt[0] != '-') {
1163                         CERROR("%s: invalid OST index in '%s': rc =%d\n",
1164                                osp->opd_obd->obd_name, src, -EINVAL);
1165                         RETURN(-EINVAL);
1166                 }
1167
1168                 osp->opd_index = idx;
1169                 idx = tgt - src - 12;
1170         }
1171         /* check the fsname length, and after this everything else will fit */
1172         if (idx > MTI_NAME_MAXLEN) {
1173                 CERROR("%s: fsname too long in '%s': rc = %d\n",
1174                        osp->opd_obd->obd_name, src, -EINVAL);
1175                 RETURN(-EINVAL);
1176         }
1177
1178         OBD_ALLOC(osdname, MAX_OBD_NAME);
1179         if (osdname == NULL)
1180                 RETURN(-ENOMEM);
1181
1182         memcpy(osdname, src, idx); /* copy just the fsname part */
1183         osdname[idx] = '\0';
1184
1185         mdt = strstr(mdt, "-MDT");
1186         if (mdt == NULL) /* 1.8 configs don't have "-MDT0000" at the end */
1187                 strcat(osdname, "-MDT0000");
1188         else
1189                 strcat(osdname, mdt);
1190         strcat(osdname, "-osd");
1191         CDEBUG(D_HA, "%s: connect to %s (%s)\n", obd->obd_name, osdname, src);
1192
1193         osp->opd_dt_dev.dd_lu_dev.ld_ops = &osp_lu_ops;
1194         osp->opd_dt_dev.dd_ops = &osp_dt_ops;
1195
1196         obd->obd_lu_dev = &osp->opd_dt_dev.dd_lu_dev;
1197
1198         rc = osp_connect_to_osd(env, osp, osdname);
1199         if (rc)
1200                 GOTO(out_fini, rc);
1201
1202         rc = ptlrpcd_addref();
1203         if (rc)
1204                 GOTO(out_disconnect, rc);
1205
1206         rc = client_obd_setup(obd, cfg);
1207         if (rc) {
1208                 CERROR("%s: can't setup obd: rc = %d\n", osp->opd_obd->obd_name,
1209                        rc);
1210                 GOTO(out_ref, rc);
1211         }
1212
1213         osp_tunables_init(osp);
1214
1215         rc = obd_fid_init(osp->opd_obd, NULL, osp->opd_connect_mdt ?
1216                           LUSTRE_SEQ_METADATA : LUSTRE_SEQ_DATA);
1217         if (rc) {
1218                 CERROR("%s: fid init error: rc = %d\n",
1219                        osp->opd_obd->obd_name, rc);
1220                 GOTO(out_proc, rc);
1221         }
1222
1223         if (!osp->opd_connect_mdt) {
1224                 /* Initialize last id from the storage - will be
1225                  * used in orphan cleanup. */
1226                 if (!osp->opd_storage->dd_rdonly) {
1227                         rc = osp_last_used_init(env, osp);
1228                         if (rc)
1229                                 GOTO(out_fid, rc);
1230                 }
1231
1232                 /* Initialize precreation thread, it handles new
1233                  * connections as well. */
1234                 rc = osp_init_precreate(osp);
1235                 if (rc)
1236                         GOTO(out_last_used, rc);
1237
1238                 /*
1239                  * Initialize synhronization mechanism taking
1240                  * care of propogating changes to OST in near
1241                  * transactional manner.
1242                  */
1243                 rc = osp_sync_init(env, osp);
1244                 if (rc < 0)
1245                         GOTO(out_precreat, rc);
1246         } else {
1247                 osp->opd_got_disconnected = 1;
1248                 rc = osp_update_init(osp);
1249                 if (rc != 0)
1250                         GOTO(out_fid, rc);
1251         }
1252
1253         rc = osp_init_statfs(osp);
1254         if (rc)
1255                 GOTO(out_precreat, rc);
1256
1257         ns_register_cancel(obd->obd_namespace, osp_cancel_weight);
1258
1259         /*
1260          * Initiate connect to OST
1261          */
1262         imp = obd->u.cli.cl_import;
1263
1264         rc = ptlrpc_init_import(imp);
1265         if (rc)
1266                 GOTO(out, rc);
1267         if (osdname)
1268                 OBD_FREE(osdname, MAX_OBD_NAME);
1269         init_waitqueue_head(&osp->opd_out_waitq);
1270         RETURN(0);
1271
1272 out:
1273         if (!osp->opd_connect_mdt)
1274                 /* stop sync thread */
1275                 osp_sync_fini(osp);
1276 out_precreat:
1277         /* stop precreate thread */
1278         if (!osp->opd_connect_mdt)
1279                 osp_precreate_fini(osp);
1280         else
1281                 osp_update_fini(env, osp);
1282 out_last_used:
1283         if (!osp->opd_connect_mdt)
1284                 osp_last_used_fini(env, osp);
1285 out_fid:
1286         obd_fid_fini(osp->opd_obd);
1287 out_proc:
1288         osp_tunables_fini(osp);
1289         client_obd_cleanup(obd);
1290 out_ref:
1291         ptlrpcd_decref();
1292 out_disconnect:
1293         obd_disconnect(osp->opd_storage_exp);
1294 out_fini:
1295         if (osdname)
1296                 OBD_FREE(osdname, MAX_OBD_NAME);
1297         RETURN(rc);
1298 }
1299
1300 /**
1301  * Implementation of lu_device_type_operations::ldto_device_free
1302  *
1303  * Free the OSP device in memory.  No return value is needed for now,
1304  * so always return NULL to comply with the interface.
1305  *
1306  * \param[in] env       execution environment
1307  * \param[in] lu        lu_device of OSP
1308  *
1309  * \retval NULL         NULL unconditionally
1310  */
1311 static struct lu_device *osp_device_free(const struct lu_env *env,
1312                                          struct lu_device *lu)
1313 {
1314         struct osp_device *osp = lu2osp_dev(lu);
1315
1316         lu_site_print(env, lu->ld_site, &lu->ld_ref, D_ERROR,
1317                       lu_cdebug_printer);
1318         dt_device_fini(&osp->opd_dt_dev);
1319         OBD_FREE_PTR(osp);
1320
1321         return NULL;
1322 }
1323
1324 /**
1325  * Implementation of lu_device_type_operations::ldto_device_alloc
1326  *
1327  * This function allocates and initializes OSP device in memory according to
1328  * the config log.
1329  *
1330  * \param[in] env       execution environment
1331  * \param[in] type      device type of OSP
1332  * \param[in] lcfg      config log
1333  *
1334  * \retval pointer              the pointer of allocated OSP if succeed.
1335  * \retval ERR_PTR(errno)       ERR_PTR(errno) if failed.
1336  */
1337 static struct lu_device *osp_device_alloc(const struct lu_env *env,
1338                                           struct lu_device_type *type,
1339                                           struct lustre_cfg *lcfg)
1340 {
1341         struct osp_device *osp;
1342         struct lu_device  *ld;
1343
1344         OBD_ALLOC_PTR(osp);
1345         if (osp == NULL) {
1346                 ld = ERR_PTR(-ENOMEM);
1347         } else {
1348                 int rc;
1349
1350                 ld = osp2lu_dev(osp);
1351                 dt_device_init(&osp->opd_dt_dev, type);
1352                 rc = osp_init0(env, osp, type, lcfg);
1353                 if (rc != 0) {
1354                         osp_device_free(env, ld);
1355                         ld = ERR_PTR(rc);
1356                 }
1357         }
1358         return ld;
1359 }
1360
1361 /**
1362  * Implementation of lu_device_type_operations::ldto_device_fini
1363  *
1364  * This function cleans up the OSP device, i.e. release and free those
1365  * attached items in osp_device.
1366  *
1367  * \param[in] env       execution environment
1368  * \param[in] ld        lu_device of OSP
1369  *
1370  * \retval NULL                 NULL if cleanup succeeded.
1371  * \retval ERR_PTR(errno)       ERR_PTR(errno) if cleanup failed.
1372  */
1373 static struct lu_device *osp_device_fini(const struct lu_env *env,
1374                                          struct lu_device *ld)
1375 {
1376         struct osp_device *osp = lu2osp_dev(ld);
1377         int                rc;
1378
1379         ENTRY;
1380
1381         if (osp->opd_async_requests != NULL) {
1382                 osp_update_request_destroy(env, osp->opd_async_requests);
1383                 osp->opd_async_requests = NULL;
1384         }
1385
1386         if (osp->opd_storage_exp) {
1387                 /* wait for the commit callbacks to complete */
1388                 wait_event(osp->opd_sync_waitq,
1389                           atomic_read(&osp->opd_commits_registered) == 0);
1390                 obd_disconnect(osp->opd_storage_exp);
1391         }
1392
1393         LASSERT(osp->opd_obd);
1394
1395         rc = client_obd_cleanup(osp->opd_obd);
1396         if (rc != 0) {
1397                 ptlrpcd_decref();
1398                 RETURN(ERR_PTR(rc));
1399         }
1400
1401         osp_tunables_fini(osp);
1402
1403         ptlrpcd_decref();
1404
1405         RETURN(NULL);
1406 }
1407
1408 /**
1409  * Implementation of obd_ops::o_reconnect
1410  *
1411  * This function is empty and does not need to do anything for now.
1412  */
1413 static int osp_reconnect(const struct lu_env *env,
1414                          struct obd_export *exp, struct obd_device *obd,
1415                          struct obd_uuid *cluuid,
1416                          struct obd_connect_data *data,
1417                          void *localdata)
1418 {
1419         return 0;
1420 }
1421
1422 /*
1423  * Implementation of obd_ops::o_connect
1424  *
1425  * Connect OSP to the remote target (MDT or OST). Allocate the
1426  * export and return it to the LOD, which calls this function
1427  * for each OSP to connect it to the remote target. This function
1428  * is currently only called once per OSP.
1429  *
1430  * \param[in] env       execution environment
1431  * \param[out] exp      export connected to OSP
1432  * \param[in] obd       OSP device
1433  * \param[in] cluuid    OSP device client uuid
1434  * \param[in] data      connect_data to be used to connect to the remote
1435  *                      target
1436  * \param[in] localdata necessary for the API interface, but not used in
1437  *                      this function
1438  *
1439  * \retval 0            0 if the connection succeeded.
1440  * \retval negative     negative errno if the connection failed.
1441  */
1442 static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp,
1443                            struct obd_device *obd, struct obd_uuid *cluuid,
1444                            struct obd_connect_data *data, void *localdata)
1445 {
1446         struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
1447         int rc;
1448
1449         ENTRY;
1450
1451         LASSERT(data != NULL);
1452         LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX);
1453
1454         rc = client_connect_import(env, &osp->opd_exp, obd, cluuid, data,
1455                                    localdata);
1456         if (rc)
1457                 RETURN(rc);
1458
1459         *exp = osp->opd_exp;
1460
1461         osp->opd_obd->u.cli.cl_seq->lcs_exp = class_export_get(osp->opd_exp);
1462         /* precreate thread can be waiting for us to initialize fld client */
1463         wake_up(&osp->opd_pre_waitq);
1464
1465         RETURN(rc);
1466 }
1467
1468 /**
1469  * Implementation of obd_ops::o_disconnect
1470  *
1471  * Disconnect the export for the OSP.  This is called by LOD to release the
1472  * OSP during cleanup (\see lod_del_device()). The OSP will be released after
1473  * the export is released.
1474  *
1475  * \param[in] exp       export to be disconnected.
1476  *
1477  * \retval 0            0 if disconnection succeed
1478  * \retval negative     negative errno if disconnection failed
1479  */
1480 static int osp_obd_disconnect(struct obd_export *exp)
1481 {
1482         struct obd_device *obd = exp->exp_obd;
1483         int                rc;
1484         ENTRY;
1485
1486         rc = class_disconnect(exp);
1487         if (rc) {
1488                 CERROR("%s: class disconnect error: rc = %d\n",
1489                        obd->obd_name, rc);
1490                 RETURN(rc);
1491         }
1492
1493         /* destroy the device */
1494         class_manual_cleanup(obd);
1495
1496         RETURN(rc);
1497 }
1498
1499 /**
1500  * Implementation of obd_ops::o_statfs
1501  *
1502  * Send a RPC to the remote target to get statfs status. This is only used
1503  * in lprocfs helpers by obd_statfs.
1504  *
1505  * \param[in] env       execution environment
1506  * \param[in] exp       connection state from this OSP to the parent (LOD)
1507  *                      device
1508  * \param[out] osfs     hold the statfs result
1509  * \param[in] unused    Not used in this function for now
1510  * \param[in] flags     flags to indicate how OSP will issue the RPC
1511  *
1512  * \retval 0            0 if statfs succeeded.
1513  * \retval negative     negative errno if statfs failed.
1514  */
1515 static int osp_obd_statfs(const struct lu_env *env, struct obd_export *exp,
1516                           struct obd_statfs *osfs, time64_t unused, __u32 flags)
1517 {
1518         struct obd_statfs       *msfs;
1519         struct ptlrpc_request   *req;
1520         struct obd_import       *imp = NULL, *imp0;
1521         int                      rc;
1522
1523         ENTRY;
1524
1525         /* Since the request might also come from lprocfs, so we need
1526          * sync this with client_disconnect_export Bug15684
1527          */
1528         with_imp_locked(exp->exp_obd, imp0, rc)
1529                 imp = class_import_get(imp0);
1530         if (rc)
1531                 RETURN(rc);
1532
1533         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
1534
1535         class_import_put(imp);
1536
1537         if (req == NULL)
1538                 RETURN(-ENOMEM);
1539
1540         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
1541         if (rc) {
1542                 ptlrpc_request_free(req);
1543                 RETURN(rc);
1544         }
1545         ptlrpc_request_set_replen(req);
1546         req->rq_request_portal = OST_CREATE_PORTAL;
1547         ptlrpc_at_set_req_timeout(req);
1548
1549         if (flags & OBD_STATFS_NODELAY) {
1550                 /* procfs requests not want stat in wait for avoid deadlock */
1551                 req->rq_no_resend = 1;
1552                 req->rq_no_delay = 1;
1553         }
1554
1555         rc = ptlrpc_queue_wait(req);
1556         if (rc)
1557                 GOTO(out, rc);
1558
1559         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1560         if (msfs == NULL)
1561                 GOTO(out, rc = -EPROTO);
1562
1563         *osfs = *msfs;
1564
1565         EXIT;
1566 out:
1567         ptlrpc_req_finished(req);
1568         return rc;
1569 }
1570
1571 /**
1572  * Implementation of obd_ops::o_import_event
1573  *
1574  * This function is called when some related import event happens. It will
1575  * mark the necessary flags according to the event and notify the necessary
1576  * threads (mainly precreate thread).
1577  *
1578  * \param[in] obd       OSP OBD device
1579  * \param[in] imp       import attached from OSP to remote (OST/MDT) service
1580  * \param[in] event     event related to remote service (IMP_EVENT_*)
1581  *
1582  * \retval 0            0 if the event handling succeeded.
1583  * \retval negative     negative errno if the event handling failed.
1584  */
1585 static int osp_import_event(struct obd_device *obd, struct obd_import *imp,
1586                             enum obd_import_event event)
1587 {
1588         struct osp_device *d = lu2osp_dev(obd->obd_lu_dev);
1589         int rc;
1590
1591         switch (event) {
1592         case IMP_EVENT_DISCON:
1593                 d->opd_got_disconnected = 1;
1594                 d->opd_imp_connected = 0;
1595                 if (d->opd_connect_mdt)
1596                         break;
1597
1598                 if (d->opd_pre != NULL) {
1599                         osp_pre_update_status(d, -ENODEV);
1600                         wake_up(&d->opd_pre_waitq);
1601                 }
1602
1603                 CDEBUG(D_HA, "got disconnected\n");
1604                 break;
1605         case IMP_EVENT_INACTIVE:
1606                 d->opd_imp_active = 0;
1607                 d->opd_imp_connected = 0;
1608                 d->opd_obd->obd_inactive = 1;
1609                 if (d->opd_connect_mdt)
1610                         break;
1611                 if (d->opd_pre != NULL) {
1612                         /* Import is invalid, we can`t get stripes so
1613                          * wakeup waiters */
1614                         rc = imp->imp_deactive ? -ESHUTDOWN : -ENODEV;
1615                         osp_pre_update_status(d, rc);
1616                         wake_up(&d->opd_pre_waitq);
1617                 }
1618
1619                 CDEBUG(D_HA, "got inactive\n");
1620                 break;
1621         case IMP_EVENT_ACTIVE:
1622                 d->opd_imp_active = 1;
1623
1624                 d->opd_new_connection = 1;
1625                 d->opd_imp_connected = 1;
1626                 d->opd_imp_seen_connected = 1;
1627                 d->opd_obd->obd_inactive = 0;
1628                 wake_up(&d->opd_pre_waitq);
1629                 if (d->opd_connect_mdt)
1630                         break;
1631
1632                 osp_sync_check_for_work(d);
1633                 CDEBUG(D_HA, "got connected\n");
1634                 break;
1635         case IMP_EVENT_INVALIDATE:
1636                 if (d->opd_connect_mdt)
1637                         osp_invalidate_request(d);
1638
1639                 if (obd->obd_namespace == NULL)
1640                         break;
1641                 ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY);
1642                 break;
1643         case IMP_EVENT_OCD:
1644         case IMP_EVENT_DEACTIVATE:
1645         case IMP_EVENT_ACTIVATE:
1646                 break;
1647         default:
1648                 CERROR("%s: unsupported import event: %#x\n",
1649                        obd->obd_name, event);
1650         }
1651         return 0;
1652 }
1653
1654 /**
1655  * Implementation of obd_ops: o_iocontrol
1656  *
1657  * This function is the ioctl handler for OSP. Note: lctl will access the OSP
1658  * directly by ioctl, instead of through the MDS stack.
1659  *
1660  * param[in] cmd        ioctl command.
1661  * param[in] exp        export of this OSP.
1662  * param[in] len        data length of \a karg.
1663  * param[in] karg       input argument which is packed as
1664  *                      obd_ioctl_data
1665  * param[out] uarg      pointer to userspace buffer (must access by
1666  *                      copy_to_user()).
1667  *
1668  * \retval 0            0 if the ioctl handling succeeded.
1669  * \retval negative     negative errno if the ioctl handling failed.
1670  */
1671 static int osp_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1672                          void *karg, void __user *uarg)
1673 {
1674         struct obd_device *obd = exp->exp_obd;
1675         struct osp_device *d;
1676         struct obd_ioctl_data *data;
1677         int rc = -EINVAL;
1678
1679         ENTRY;
1680         CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
1681                exp->exp_obd->obd_name, cmd, len, karg, uarg);
1682         if (unlikely(karg == NULL)) {
1683                 CERROR("%s: iocontrol from '%s' cmd=%x karg=NULL: rc = %d\n",
1684                        obd->obd_name, current->comm, cmd, rc);
1685                 RETURN(rc);
1686         }
1687         data = karg;
1688
1689         LASSERT(obd->obd_lu_dev);
1690         d = lu2osp_dev(obd->obd_lu_dev);
1691         LASSERT(d->opd_dt_dev.dd_ops == &osp_dt_ops);
1692
1693         if (!try_module_get(THIS_MODULE)) {
1694                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
1695                        module_name(THIS_MODULE));
1696                 return -EINVAL;
1697         }
1698
1699         switch (cmd) {
1700         case OBD_IOC_CLIENT_RECOVER:
1701                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
1702                                            data->ioc_inlbuf1, 0);
1703                 if (rc > 0)
1704                         rc = 0;
1705                 break;
1706 #ifdef IOC_OSC_SET_ACTIVE
1707         case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17);
1708 #endif
1709         case OBD_IOC_SET_ACTIVE:
1710                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
1711                                               data->ioc_offset);
1712                 break;
1713         default:
1714                 rc = OBD_IOC_ERROR(obd->obd_name, cmd, "unrecognized", -ENOTTY);
1715                 break;
1716         }
1717         module_put(THIS_MODULE);
1718         return rc;
1719 }
1720
1721 /**
1722  * Implementation of obd_ops::o_get_info
1723  *
1724  * Retrieve information by key. Retrieval starts from the top layer
1725  * (MDT) of the MDS stack and traverses the stack by calling the
1726  * obd_get_info() method of the next sub-layer.
1727  *
1728  * \param[in] env       execution environment
1729  * \param[in] exp       export of this OSP
1730  * \param[in] keylen    length of \a key
1731  * \param[in] key       the key
1732  * \param[out] vallen   length of \a val
1733  * \param[out] val      holds the value returned by the key
1734  *
1735  * \retval 0            0 if getting information succeeded.
1736  * \retval negative     negative errno if getting information failed.
1737  */
1738 static int osp_obd_get_info(const struct lu_env *env, struct obd_export *exp,
1739                             __u32 keylen, void *key, __u32 *vallen, void *val)
1740 {
1741         int rc = -EINVAL;
1742
1743         if (KEY_IS(KEY_OSP_CONNECTED)) {
1744                 struct obd_device       *obd = exp->exp_obd;
1745                 struct osp_device       *osp;
1746
1747                 if (!obd->obd_set_up || obd->obd_stopping)
1748                         RETURN(-EAGAIN);
1749
1750                 osp = lu2osp_dev(obd->obd_lu_dev);
1751                 LASSERT(osp);
1752                 /*
1753                  * 1.8/2.0 behaviour is that OST being connected once at least
1754                  * is considered "healthy". and one "healthy" OST is enough to
1755                  * allow lustre clients to connect to MDS
1756                  */
1757                 RETURN(!osp->opd_imp_seen_connected);
1758         }
1759
1760         RETURN(rc);
1761 }
1762
1763 static int osp_obd_set_info_async(const struct lu_env *env,
1764                                   struct obd_export *exp,
1765                                   u32 keylen, void *key,
1766                                   u32 vallen, void *val,
1767                                   struct ptlrpc_request_set *set)
1768 {
1769         struct obd_device       *obd = exp->exp_obd;
1770         struct obd_import       *imp = obd->u.cli.cl_import;
1771         struct osp_device       *osp;
1772         struct ptlrpc_request   *req;
1773         char                    *tmp;
1774         int                      rc;
1775
1776         if (KEY_IS(KEY_SPTLRPC_CONF)) {
1777                 sptlrpc_conf_client_adapt(exp->exp_obd);
1778                 RETURN(0);
1779         }
1780
1781         LASSERT(set != NULL);
1782         if (!obd->obd_set_up || obd->obd_stopping)
1783                 RETURN(-EAGAIN);
1784         osp = lu2osp_dev(obd->obd_lu_dev);
1785
1786         req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
1787         if (req == NULL)
1788                 RETURN(-ENOMEM);
1789
1790         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
1791                              RCL_CLIENT, keylen);
1792         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
1793                              RCL_CLIENT, vallen);
1794         if (osp->opd_connect_mdt)
1795                 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SET_INFO);
1796         else
1797                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
1798         if (rc) {
1799                 ptlrpc_request_free(req);
1800                 RETURN(rc);
1801         }
1802
1803         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1804         memcpy(tmp, key, keylen);
1805         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
1806         memcpy(tmp, val, vallen);
1807
1808         ptlrpc_request_set_replen(req);
1809         ptlrpc_set_add_req(set, req);
1810         ptlrpc_check_set(NULL, set);
1811
1812         RETURN(0);
1813 }
1814
1815 /* context key constructor/destructor: mdt_key_init, mdt_key_fini */
1816 LU_KEY_INIT_FINI(osp, struct osp_thread_info);
1817 static void osp_key_exit(const struct lu_context *ctx,
1818                          struct lu_context_key *key, void *data)
1819 {
1820         struct osp_thread_info *info = data;
1821
1822         info->osi_attr.la_valid = 0;
1823 }
1824
1825 struct lu_context_key osp_thread_key = {
1826         .lct_tags = LCT_MD_THREAD,
1827         .lct_init = osp_key_init,
1828         .lct_fini = osp_key_fini,
1829         .lct_exit = osp_key_exit
1830 };
1831
1832 /* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
1833 LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info);
1834
1835 struct lu_context_key osp_txn_key = {
1836         .lct_tags = LCT_OSP_THREAD,
1837         .lct_init = osp_txn_key_init,
1838         .lct_fini = osp_txn_key_fini
1839 };
1840 LU_TYPE_INIT_FINI(osp, &osp_thread_key, &osp_txn_key);
1841
1842 static const struct lu_device_type_operations osp_device_type_ops = {
1843         .ldto_init           = osp_type_init,
1844         .ldto_fini           = osp_type_fini,
1845
1846         .ldto_start          = osp_type_start,
1847         .ldto_stop           = osp_type_stop,
1848
1849         .ldto_device_alloc   = osp_device_alloc,
1850         .ldto_device_free    = osp_device_free,
1851
1852         .ldto_device_fini    = osp_device_fini
1853 };
1854
1855 static struct lu_device_type osp_device_type = {
1856         .ldt_tags     = LU_DEVICE_DT,
1857         .ldt_name     = LUSTRE_OSP_NAME,
1858         .ldt_ops      = &osp_device_type_ops,
1859         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
1860 };
1861
1862 static const struct obd_ops osp_obd_device_ops = {
1863         .o_owner        = THIS_MODULE,
1864         .o_add_conn     = client_import_add_conn,
1865         .o_del_conn     = client_import_del_conn,
1866         .o_reconnect    = osp_reconnect,
1867         .o_connect      = osp_obd_connect,
1868         .o_disconnect   = osp_obd_disconnect,
1869         .o_get_info     = osp_obd_get_info,
1870         .o_set_info_async = osp_obd_set_info_async,
1871         .o_import_event = osp_import_event,
1872         .o_iocontrol    = osp_iocontrol,
1873         .o_statfs       = osp_obd_statfs,
1874         .o_fid_init     = client_fid_init,
1875         .o_fid_fini     = client_fid_fini,
1876 };
1877
1878 /**
1879  * Initialize OSP module.
1880  *
1881  * Register device types OSP and Light Weight Proxy (LWP) (\see lwp_dev.c)
1882  * in obd_types (\see class_obd.c).  Initialize procfs for the
1883  * the OSP device.  Note: OSP was called OSC before Lustre 2.4,
1884  * so for compatibility it still uses the name "osc" in procfs.
1885  * This is called at module load time.
1886  *
1887  * \retval 0            0 if initialization succeeds.
1888  * \retval negative     negative errno if initialization failed.
1889  */
1890 static int __init osp_init(void)
1891 {
1892         struct obd_type *sym;
1893         int rc;
1894
1895         rc = libcfs_setup();
1896         if (rc)
1897                 return rc;
1898
1899         rc = lu_kmem_init(osp_caches);
1900         if (rc)
1901                 return rc;
1902
1903         rc = class_register_type(&osp_obd_device_ops, NULL, false,
1904                                  LUSTRE_OSP_NAME, &osp_device_type);
1905         if (rc != 0) {
1906                 lu_kmem_fini(osp_caches);
1907                 return rc;
1908         }
1909
1910         rc = class_register_type(&lwp_obd_device_ops, NULL, false,
1911                                  LUSTRE_LWP_NAME, &lwp_device_type);
1912         if (rc != 0) {
1913                 class_unregister_type(LUSTRE_OSP_NAME);
1914                 lu_kmem_fini(osp_caches);
1915                 return rc;
1916         }
1917
1918         /* create "osc" entry for compatibility purposes */
1919         sym = class_add_symlinks(LUSTRE_OSC_NAME, true);
1920         if (IS_ERR(sym)) {
1921                 rc = PTR_ERR(sym);
1922                 /* does real "osc" already exist ? */
1923                 if (rc == -EEXIST)
1924                         rc = 0;
1925         }
1926
1927         return rc;
1928 }
1929
1930 /**
1931  * Finalize OSP module.
1932  *
1933  * This callback is called when kernel unloads OSP module from memory, and
1934  * it will deregister OSP and LWP device type from obd_types (\see class_obd.c).
1935  */
1936 static void __exit osp_exit(void)
1937 {
1938         struct obd_type *sym = class_search_type(LUSTRE_OSC_NAME);
1939
1940         /* if this was never fully initialized by the osc layer
1941          * then we are responsible for freeing this obd_type
1942          */
1943         if (sym) {
1944                 /* final put if we manage this obd type */
1945                 if (sym->typ_sym_filter)
1946                         kobject_put(&sym->typ_kobj);
1947                 /* put reference taken by class_search_type */
1948                 kobject_put(&sym->typ_kobj);
1949         }
1950
1951         class_unregister_type(LUSTRE_LWP_NAME);
1952         class_unregister_type(LUSTRE_OSP_NAME);
1953         lu_kmem_fini(osp_caches);
1954 }
1955
1956 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1957 MODULE_DESCRIPTION("Lustre OSD Storage Proxy ("LUSTRE_OSP_NAME")");
1958 MODULE_VERSION(LUSTRE_VERSION_STRING);
1959 MODULE_LICENSE("GPL");
1960
1961 module_init(osp_init);
1962 module_exit(osp_exit);