Whamcloud - gitweb
LU-16634 misc: replace obsolete ioctl numbers
[fs/lustre-release.git] / lustre / osp / osp_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/osp/osp_dev.c
32  *
33  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
34  * Author: Mikhail Pershin <mike.pershin@intel.com>
35  * Author: Di Wang <di.wang@intel.com>
36  */
37 /*
38  * The Object Storage Proxy (OSP) module provides an implementation of
39  * the DT API for remote MDTs and OSTs. Every local OSP device (or
40  * object) is a proxy for a remote OSD device (or object). Thus OSP
41  * converts DT operations into RPCs, which are sent to the OUT service
42  * on a remote target, converted back to DT operations, and
43  * executed. Of course there are many ways in which this description
44  * is inaccurate but it's a good enough mental model. OSP is used by
45  * the MDT stack in several ways:
46  *
47  * - OSP devices allocate FIDs for the stripe sub-objects of a striped
48  *   file or directory.
49  *
50  * - OSP objects represent the remote MDT and OST objects that are
51  *   the stripes of a striped object.
52  *
53  * - OSP devices log, send, and track synchronous operations (setattr
54  *   and unlink) to remote targets.
55  *
56  * - OSP objects are the bottom slice of the compound LU object
57  *   representing a remote MDT object: MDT/MDD/LOD/OSP.
58  *
59  * - OSP objects are used by LFSCK to represent remote OST objects
60  *   during the verification of MDT-OST consistency.
61  *
62  * - OSP devices batch idempotent requests (declare_attr_get() and
63  *   declare_xattr_get()) to the remote target and cache their results.
64  *
65  * In addition the OSP layer implements a subset of the OBD device API
66  * to support being a client of a remote target, connecting to other
67  * layers, and FID allocation.
68  */
69
70 #define DEBUG_SUBSYSTEM S_MDS
71
72 #include <linux/kthread.h>
73
74 #include <uapi/linux/lustre/lustre_ioctl.h>
75 #include <lustre_ioctl_old.h>
76 #include <lustre_log.h>
77 #include <lustre_obdo.h>
78 #include <uapi/linux/lustre/lustre_param.h>
79 #include <obd_class.h>
80
81 #include "osp_internal.h"
82
83 /* Slab for OSP object allocation */
84 struct kmem_cache *osp_object_kmem;
85
86 static struct lu_kmem_descr osp_caches[] = {
87         {
88                 .ckd_cache = &osp_object_kmem,
89                 .ckd_name  = "osp_obj",
90                 .ckd_size  = sizeof(struct osp_object)
91         },
92         {
93                 .ckd_cache = NULL
94         }
95 };
96
97 /**
98  * Implementation of lu_device_operations::ldo_object_alloc
99  *
100  * Allocates an OSP object in memory, whose FID is on the remote target.
101  *
102  * \param[in] env       execution environment
103  * \param[in] hdr       The header of the object stack. If it is NULL, it
104  *                      means the object is not built from top device, i.e.
105  *                      it is a sub-stripe object of striped directory or
106  *                      an OST object.
107  * \param[in] d         OSP device
108  *
109  * \retval object       object being created if the creation succeed.
110  * \retval NULL         NULL if the creation failed.
111  */
112 static struct lu_object *osp_object_alloc(const struct lu_env *env,
113                                           const struct lu_object_header *hdr,
114                                           struct lu_device *d)
115 {
116         struct osp_object *o;
117
118         OBD_SLAB_ALLOC_PTR_GFP(o, osp_object_kmem, GFP_NOFS);
119         if (o != NULL) {
120                 struct lu_object *l = &o->opo_obj.do_lu;
121
122                 /* If hdr is NULL, it means the object is not built
123                  * from the top dev(MDT/OST), usually it happens when
124                  * building striped object, like data object on MDT or
125                  * striped object for directory */
126                 if (hdr == NULL) {
127                         struct lu_object_header *h = &o->opo_header;
128
129                         lu_object_header_init(h);
130                         dt_object_init(&o->opo_obj, h, d);
131                         lu_object_add_top(h, l);
132                 } else {
133                         dt_object_init(&o->opo_obj, NULL, d);
134                 }
135
136                 l->lo_ops = &osp_lu_obj_ops;
137
138                 init_rwsem(&o->opo_sem);
139                 INIT_LIST_HEAD(&o->opo_xattr_list);
140                 INIT_LIST_HEAD(&o->opo_invalidate_cb_list);
141                 spin_lock_init(&o->opo_lock);
142                 init_rwsem(&o->opo_invalidate_sem);
143
144                 return l;
145         }
146
147         return NULL;
148 }
149
150 /**
151  * Find or create the local object
152  *
153  * Finds or creates the local file referenced by \a reg_id and return the
154  * attributes of the local file.
155  *
156  * \param[in] env       execution environment
157  * \param[in] osp       OSP device
158  * \param[out] attr     attributes of the object
159  * \param[in] reg_id    the local object ID of the file. It will be used
160  *                      to compose a local FID{FID_SEQ_LOCAL_FILE, reg_id, 0}
161  *                      to identify the object.
162  *
163  * \retval object               object(dt_object) found or created
164  * \retval ERR_PTR(errno)       ERR_PTR(errno) if not get the object.
165  */
166 static struct dt_object
167 *osp_find_or_create_local_file(const struct lu_env *env, struct osp_device *osp,
168                                struct lu_attr *attr, __u32 reg_id)
169 {
170         struct osp_thread_info *osi = osp_env_info(env);
171         struct dt_object_format dof = { 0 };
172         struct dt_object       *dto;
173         int                  rc;
174         ENTRY;
175
176         lu_local_obj_fid(&osi->osi_fid, reg_id);
177         attr->la_valid = LA_MODE;
178         attr->la_mode = S_IFREG | 0644;
179         dof.dof_type = DFT_REGULAR;
180         /* Find or create the local object by osi_fid. */
181         dto = dt_find_or_create(env, osp->opd_storage, &osi->osi_fid,
182                                 &dof, attr);
183         if (IS_ERR(dto))
184                 RETURN(dto);
185
186         /* Get attributes of the local object. */
187         rc = dt_attr_get(env, dto, attr);
188         if (rc) {
189                 CERROR("%s: can't be initialized: rc = %d\n",
190                        osp->opd_obd->obd_name, rc);
191                 dt_object_put(env, dto);
192                 RETURN(ERR_PTR(rc));
193         }
194         RETURN(dto);
195 }
196
197 /**
198  * Write data buffer to a local file object.
199  *
200  * \param[in] env       execution environment
201  * \param[in] osp       OSP device
202  * \param[in] dt_obj    object written to
203  * \param[in] buf       buffer containing byte array and length
204  * \param[in] offset    write offset in the object in bytes
205  *
206  * \retval 0            0 if write succeed
207  * \retval -EFAULT      -EFAULT if only part of buffer is written.
208  * \retval negative             other negative errno if write failed.
209  */
210 static int osp_write_local_file(const struct lu_env *env,
211                                 struct osp_device *osp,
212                                 struct dt_object *dt_obj,
213                                 struct lu_buf *buf,
214                                 loff_t offset)
215 {
216         struct thandle *th;
217         int rc;
218
219         if (osp->opd_storage->dd_rdonly)
220                 RETURN(0);
221
222         th = dt_trans_create(env, osp->opd_storage);
223         if (IS_ERR(th))
224                 RETURN(PTR_ERR(th));
225
226         rc = dt_declare_record_write(env, dt_obj, buf, offset, th);
227         if (rc)
228                 GOTO(out, rc);
229         rc = dt_trans_start_local(env, osp->opd_storage, th);
230         if (rc)
231                 GOTO(out, rc);
232
233         rc = dt_record_write(env, dt_obj, buf, &offset, th);
234 out:
235         dt_trans_stop(env, osp->opd_storage, th);
236         RETURN(rc);
237 }
238
239 /**
240  * Initialize last ID object.
241  *
242  * This function initializes the LAST_ID file, which stores the current last
243  * used id of data objects. The MDT will use the last used id and the last_seq
244  * (\see osp_init_last_seq()) to synchronize the precreate object cache with
245  * OSTs.
246  *
247  * \param[in] env       execution environment
248  * \param[in] osp       OSP device
249  *
250  * \retval 0            0 if initialization succeed
251  * \retval negative     negative errno if initialization failed
252  */
253 static int osp_init_last_objid(const struct lu_env *env, struct osp_device *osp)
254 {
255         struct osp_thread_info  *osi = osp_env_info(env);
256         struct lu_fid           *fid = &osp->opd_last_used_fid;
257         struct dt_object        *dto;
258         int                     rc = -EFAULT;
259         ENTRY;
260
261         dto = osp_find_or_create_local_file(env, osp, &osi->osi_attr,
262                                             MDD_LOV_OBJ_OID);
263         if (IS_ERR(dto))
264                 RETURN(PTR_ERR(dto));
265
266         osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, &osp->opd_last_id,
267                            osp->opd_index);
268
269         /* object will be released in device cleanup path */
270         if (osi->osi_attr.la_size >= (osi->osi_off + osi->osi_lb.lb_len)) {
271                 rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
272                 if (rc != 0 && rc != -EFAULT)
273                         GOTO(out, rc);
274                 /* In case of idif bits 32-48 go to f_seq
275                  * (see osp_init_last_seq). So don't care
276                  * about u64->u32 convertion. */
277                 fid->f_oid = osp->opd_last_id;
278         }
279
280         if (rc == -EFAULT) { /* fresh LAST_ID */
281                 osp->opd_last_id = 0;
282                 fid->f_oid = 0;
283                 rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
284                                           osi->osi_off);
285                 if (rc != 0)
286                         GOTO(out, rc);
287         }
288         osp->opd_last_used_oid_file = dto;
289         RETURN(0);
290 out:
291         /* object will be released in device cleanup path */
292         CERROR("%s: can't initialize lov_objid: rc = %d\n",
293                osp->opd_obd->obd_name, rc);
294         dt_object_put(env, dto);
295         osp->opd_last_used_oid_file = NULL;
296         RETURN(rc);
297 }
298
299 /**
300  * Initialize last sequence object.
301  *
302  * This function initializes the LAST_SEQ file in the local OSD, which stores
303  * the current last used sequence of data objects. The MDT will use the last
304  * sequence and last id (\see osp_init_last_objid()) to synchronize the
305  * precreate object cache with OSTs.
306  *
307  * \param[in] env       execution environment
308  * \param[in] osp       OSP device
309  *
310  * \retval 0            0 if initialization succeed
311  * \retval negative     negative errno if initialization failed
312  */
313 static int osp_init_last_seq(const struct lu_env *env, struct osp_device *osp)
314 {
315         struct osp_thread_info  *osi = osp_env_info(env);
316         struct lu_fid           *fid = &osp->opd_last_used_fid;
317         struct dt_object        *dto;
318         int                     rc = -EFAULT;
319         ENTRY;
320
321         dto = osp_find_or_create_local_file(env, osp, &osi->osi_attr,
322                                             MDD_LOV_OBJ_OSEQ);
323         if (IS_ERR(dto))
324                 RETURN(PTR_ERR(dto));
325
326         osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off, &fid->f_seq,
327                            osp->opd_index);
328
329         /* object will be released in device cleanup path */
330         if (osi->osi_attr.la_size >= (osi->osi_off + osi->osi_lb.lb_len)) {
331                 rc = dt_record_read(env, dto, &osi->osi_lb, &osi->osi_off);
332                 if (rc != 0 && rc != -EFAULT)
333                         GOTO(out, rc);
334                 if (fid_is_idif(fid))
335                         fid->f_seq = fid_idif_seq(osp->opd_last_id,
336                                                   osp->opd_index);
337         }
338
339         if (rc == -EFAULT) { /* fresh OSP */
340                 fid->f_seq = 0;
341                 rc = osp_write_local_file(env, osp, dto, &osi->osi_lb,
342                                           osi->osi_off);
343                 if (rc != 0)
344                         GOTO(out, rc);
345         }
346         osp->opd_last_used_seq_file = dto;
347         RETURN(0);
348 out:
349         /* object will be released in device cleanup path */
350         CERROR("%s: can't initialize lov_seq: rc = %d\n",
351                osp->opd_obd->obd_name, rc);
352         dt_object_put(env, dto);
353         osp->opd_last_used_seq_file = NULL;
354         RETURN(rc);
355 }
356
357 /**
358  * Initialize last OID and sequence object.
359  *
360  * If the MDT is just upgraded to 2.4 from the lower version, where the
361  * LAST_SEQ file does not exist, the file will be created and IDIF sequence
362  * will be written into the file.
363  *
364  * \param[in] env       execution environment
365  * \param[in] osp       OSP device
366  *
367  * \retval 0            0 if initialization succeed
368  * \retval negative     negative error if initialization failed
369  */
370 static int osp_last_used_init(const struct lu_env *env, struct osp_device *osp)
371 {
372         struct osp_thread_info *osi = osp_env_info(env);
373         int                  rc;
374         ENTRY;
375
376         fid_zero(&osp->opd_last_used_fid);
377         rc = osp_init_last_objid(env, osp);
378         if (rc < 0) {
379                 CERROR("%s: Can not get ids %d from old objid!\n",
380                        osp->opd_obd->obd_name, rc);
381                 RETURN(rc);
382         }
383
384         rc = osp_init_last_seq(env, osp);
385         if (rc < 0) {
386                 CERROR("%s: Can not get sequence %d from old objseq!\n",
387                        osp->opd_obd->obd_name, rc);
388                 GOTO(out, rc);
389         }
390
391         if (fid_oid(&osp->opd_last_used_fid) != 0 &&
392             fid_seq(&osp->opd_last_used_fid) == 0) {
393                 /* Just upgrade from the old version,
394                  * set the seq to be IDIF */
395                 osp->opd_last_used_fid.f_seq =
396                    fid_idif_seq(fid_oid(&osp->opd_last_used_fid),
397                                 osp->opd_index);
398                 osp_objseq_buf_prep(&osi->osi_lb, &osi->osi_off,
399                                     &osp->opd_last_used_fid.f_seq,
400                                     osp->opd_index);
401                 rc = osp_write_local_file(env, osp, osp->opd_last_used_seq_file,
402                                           &osi->osi_lb, osi->osi_off);
403                 if (rc) {
404                         CERROR("%s : Can not write seq file: rc = %d\n",
405                                osp->opd_obd->obd_name, rc);
406                         GOTO(out, rc);
407                 }
408         }
409
410         if (!fid_is_zero(&osp->opd_last_used_fid) &&
411                  !fid_is_sane(&osp->opd_last_used_fid)) {
412                 CERROR("%s: Got invalid FID "DFID"\n", osp->opd_obd->obd_name,
413                         PFID(&osp->opd_last_used_fid));
414                 GOTO(out, rc = -EINVAL);
415         }
416
417         osp_fid_to_obdid(&osp->opd_last_used_fid, &osp->opd_last_id);
418         CDEBUG(D_INFO, "%s: Init last used fid "DFID"\n",
419                osp->opd_obd->obd_name, PFID(&osp->opd_last_used_fid));
420 out:
421         if (rc != 0) {
422                 if (osp->opd_last_used_oid_file != NULL) {
423                         dt_object_put(env, osp->opd_last_used_oid_file);
424                         osp->opd_last_used_oid_file = NULL;
425                 }
426                 if (osp->opd_last_used_seq_file != NULL) {
427                         dt_object_put(env, osp->opd_last_used_seq_file);
428                         osp->opd_last_used_seq_file = NULL;
429                 }
430         }
431
432         RETURN(rc);
433 }
434
435 /**
436  * Release the last sequence and OID file objects in OSP device.
437  *
438  * \param[in] env       execution environment
439  * \param[in] osp       OSP device
440  */
441 static void osp_last_used_fini(const struct lu_env *env, struct osp_device *osp)
442 {
443         /* release last_used file */
444         if (osp->opd_last_used_oid_file != NULL) {
445                 dt_object_put(env, osp->opd_last_used_oid_file);
446                 osp->opd_last_used_oid_file = NULL;
447         }
448
449         if (osp->opd_last_used_seq_file != NULL) {
450                 dt_object_put(env, osp->opd_last_used_seq_file);
451                 osp->opd_last_used_seq_file = NULL;
452         }
453 }
454
455 /**
456  * Disconnects the connection between OSP and its correspondent MDT or OST, and
457  * the import will be marked as inactive. It will only be called during OSP
458  * cleanup process.
459  *
460  * \param[in] d         OSP device being disconnected
461  *
462  * \retval 0            0 if disconnection succeed
463  * \retval negative     negative errno if disconnection failed
464  */
465 static int osp_disconnect(struct osp_device *d)
466 {
467         struct obd_device *obd = d->opd_obd;
468         struct obd_import *imp;
469         int rc = 0;
470
471         imp = obd->u.cli.cl_import;
472
473         /* Mark import deactivated now, so we don't try to reconnect if any
474          * of the cleanup RPCs fails (e.g. ldlm cancel, etc).  We don't
475          * fully deactivate the import, or that would drop all requests. */
476         LASSERT(imp != NULL);
477         spin_lock(&imp->imp_lock);
478         imp->imp_deactive = 1;
479         spin_unlock(&imp->imp_lock);
480
481         ptlrpc_deactivate_import(imp);
482
483         /* Some non-replayable imports (MDS's OSCs) are pinged, so just
484          * delete it regardless.  (It's safe to delete an import that was
485          * never added.) */
486         (void)ptlrpc_pinger_del_import(imp);
487
488         /* Send disconnect on healthy import, do force disconnect otherwise */
489         spin_lock(&imp->imp_lock);
490         imp->imp_obd->obd_force = imp->imp_state != LUSTRE_IMP_FULL;
491         spin_unlock(&imp->imp_lock);
492
493         rc = ptlrpc_disconnect_import(imp, 0);
494         if (rc != 0)
495                 CERROR("%s: can't disconnect: rc = %d\n", obd->obd_name, rc);
496
497         ptlrpc_invalidate_import(imp);
498
499         RETURN(rc);
500 }
501
502 /**
503  * Initialize the osp_update structure in OSP device
504  *
505  * Allocate osp update structure and start update thread.
506  *
507  * \param[in] osp       OSP device
508  *
509  * \retval              0 if initialization succeeds.
510  * \retval              negative errno if initialization fails.
511  */
512 static int osp_update_init(struct osp_device *osp)
513 {
514         struct task_struct *task;
515         int rc;
516
517         ENTRY;
518
519         LASSERT(osp->opd_connect_mdt);
520
521         if (osp->opd_storage->dd_rdonly)
522                 RETURN(0);
523
524         OBD_ALLOC_PTR(osp->opd_update);
525         if (osp->opd_update == NULL)
526                 RETURN(-ENOMEM);
527
528         init_waitqueue_head(&osp->opd_update->ou_waitq);
529         spin_lock_init(&osp->opd_update->ou_lock);
530         INIT_LIST_HEAD(&osp->opd_update->ou_list);
531         osp->opd_update->ou_rpc_version = 1;
532         osp->opd_update->ou_version = 1;
533         osp->opd_update->ou_generation = 0;
534
535         rc = lu_env_init(&osp->opd_update->ou_env,
536                          osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
537         if (rc < 0) {
538                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
539                        rc);
540                 OBD_FREE_PTR(osp->opd_update);
541                 osp->opd_update = NULL;
542                 RETURN(rc);
543         }
544         /* start thread handling sending updates to the remote MDT */
545         task = kthread_create(osp_send_update_thread, osp,
546                               "osp_up%u-%u", osp->opd_index, osp->opd_group);
547         if (IS_ERR(task)) {
548                 int rc = PTR_ERR(task);
549
550                 lu_env_fini(&osp->opd_update->ou_env);
551                 OBD_FREE_PTR(osp->opd_update);
552                 osp->opd_update = NULL;
553                 CERROR("%s: can't start precreate thread: rc = %d\n",
554                        osp->opd_obd->obd_name, rc);
555                 RETURN(rc);
556         }
557
558         osp->opd_update->ou_update_task = task;
559         wake_up_process(task);
560
561         RETURN(0);
562 }
563
564 /**
565  * Finialize osp_update structure in OSP device
566  *
567  * Stop the OSP update sending thread, then delete the left
568  * osp thandle in the sending list.
569  *
570  * \param [in] osp      OSP device.
571  */
572 static void osp_update_fini(const struct lu_env *env, struct osp_device *osp)
573 {
574         struct obd_import *imp = osp->opd_obd->u.cli.cl_import;
575         struct osp_update_request *our;
576         struct osp_update_request *tmp;
577         struct osp_updates *ou = osp->opd_update;
578
579         if (ou == NULL)
580                 return;
581
582         kthread_stop(ou->ou_update_task);
583         lu_env_fini(&ou->ou_env);
584
585         /*
586          * import invalidation can be running in a dedicated thread.
587          * we have to wait for the thread's completion as the thread
588          * invalidates this list as well.
589          */
590         wait_event_idle(imp->imp_recovery_waitq,
591                         (atomic_read(&imp->imp_inval_count) == 0));
592
593         /* Remove the left osp thandle from the list */
594         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
595                 list_del_init(&our->our_list);
596                 LASSERT(our->our_th != NULL);
597                 osp_trans_callback(env, our->our_th, -EIO);
598                 /* our will be destroyed in osp_thandle_put() */
599                 osp_thandle_put(env, our->our_th);
600         }
601
602         OBD_FREE_PTR(ou);
603         osp->opd_update = NULL;
604 }
605
606 /**
607  * Cleanup OSP, which includes disconnect import, cleanup unlink log, stop
608  * precreate threads etc.
609  *
610  * \param[in] env       execution environment.
611  * \param[in] d         OSP device being disconnected.
612  *
613  * \retval 0            0 if cleanup succeed
614  * \retval negative     negative errno if cleanup failed
615  */
616 static int osp_shutdown(const struct lu_env *env, struct osp_device *d)
617 {
618         int                      rc = 0;
619         ENTRY;
620
621         LASSERT(env);
622
623         rc = osp_disconnect(d);
624
625         osp_statfs_fini(d);
626
627         if (!d->opd_connect_mdt) {
628                 /* stop sync thread */
629                 osp_sync_fini(d);
630
631                 /* stop precreate thread */
632                 osp_precreate_fini(d);
633
634                 /* release last_used file */
635                 osp_last_used_fini(env, d);
636         }
637
638         obd_fid_fini(d->opd_obd);
639
640         RETURN(rc);
641 }
642
643 /**
644  * Implementation of osp_lu_ops::ldo_process_config
645  *
646  * This function processes config log records in OSP layer. It is usually
647  * called from the top layer of MDT stack, and goes through the stack by calling
648  * ldo_process_config of next layer.
649  *
650  * \param[in] env       execution environment
651  * \param[in] dev       lu_device of OSP
652  * \param[in] lcfg      config log
653  *
654  * \retval 0            0 if the config log record is executed correctly.
655  * \retval negative     negative errno if the record execution fails.
656  */
657 static int osp_process_config(const struct lu_env *env,
658                               struct lu_device *dev, struct lustre_cfg *lcfg)
659 {
660         struct osp_device *d = lu2osp_dev(dev);
661         struct dt_device *dt = lu2dt_dev(dev);
662         struct obd_device *obd = d->opd_obd;
663         ssize_t count;
664         int rc;
665
666         ENTRY;
667
668         switch (lcfg->lcfg_command) {
669         case LCFG_PRE_CLEANUP:
670                 rc = osp_disconnect(d);
671                 osp_update_fini(env, d);
672                 break;
673         case LCFG_CLEANUP:
674                 /*
675                  * cleanup ldlm so that PRE_CLEANUP phase doesn't block
676                  * awaiting for locks held by MDT threads awaiting for
677                  * all OSPs to interrupt their in-flight RPCs
678                  */
679                 if (obd->obd_namespace != NULL)
680                         ldlm_namespace_free_prior(obd->obd_namespace, NULL, 1);
681                 lu_dev_del_linkage(dev->ld_site, dev);
682                 rc = osp_shutdown(env, d);
683                 break;
684         case LCFG_PARAM:
685                 count = class_modify_config(lcfg, d->opd_connect_mdt ?
686                                                   PARAM_OSP : PARAM_OSC,
687                                             &dt->dd_kobj);
688                 if (count < 0) {
689                         /* class_modify_config() haven't found matching
690                          * parameter and returned an error so that layer(s)
691                          * below could use that. But OSP is the bottom, so
692                          * just ignore it
693                          */
694                         CERROR("%s: unknown param %s\n",
695                                (char *)lustre_cfg_string(lcfg, 0),
696                                (char *)lustre_cfg_string(lcfg, 1));
697                 }
698                 rc = 0;
699                 break;
700         default:
701                 CERROR("%s: unknown command %u\n",
702                        (char *)lustre_cfg_string(lcfg, 0), lcfg->lcfg_command);
703                 rc = 0;
704                 break;
705         }
706
707         RETURN(rc);
708 }
709
710 /**
711  * Implementation of osp_lu_ops::ldo_recovery_complete
712  *
713  * This function is called after recovery is finished, and OSP layer
714  * will wake up precreate thread here.
715  *
716  * \param[in] env       execution environment
717  * \param[in] dev       lu_device of OSP
718  *
719  * \retval 0            0 unconditionally
720  */
721 static int osp_recovery_complete(const struct lu_env *env,
722                                  struct lu_device *dev)
723 {
724         struct osp_device       *osp = lu2osp_dev(dev);
725
726         ENTRY;
727         osp->opd_recovery_completed = 1;
728
729         if (!osp->opd_connect_mdt && osp->opd_pre != NULL)
730                 wake_up(&osp->opd_pre_waitq);
731
732         RETURN(0);
733 }
734
735 /**
736  * Implementation of lu_device_operations::ldo_fid_alloc() for OSP
737  *
738  * Allocate FID from remote MDT.
739  *
740  * see include/lu_object.h for the details.
741  */
742 static int osp_fid_alloc(const struct lu_env *env, struct lu_device *d,
743                          struct lu_fid *fid, struct lu_object *parent,
744                          const struct lu_name *name)
745 {
746         struct osp_device *osp = lu2osp_dev(d);
747         struct client_obd *cli = &osp->opd_obd->u.cli;
748         struct lu_client_seq *seq = cli->cl_seq;
749         int rc;
750
751         ENTRY;
752
753         /* Sigh, fid client is not ready yet */
754         if (!osp->opd_obd->u.cli.cl_seq)
755                 RETURN(-ENOTCONN);
756
757         if (!osp->opd_obd->u.cli.cl_seq->lcs_exp)
758                 RETURN(-ENOTCONN);
759
760         rc = seq_client_alloc_fid(env, seq, fid);
761
762         RETURN(rc);
763 }
764
765 const struct lu_device_operations osp_lu_ops = {
766         .ldo_object_alloc       = osp_object_alloc,
767         .ldo_process_config     = osp_process_config,
768         .ldo_recovery_complete  = osp_recovery_complete,
769         .ldo_fid_alloc          = osp_fid_alloc,
770 };
771
772 /**
773  * Implementation of dt_device_operations::dt_statfs
774  *
775  * This function provides statfs status (for precreation) from
776  * corresponding OST. Note: this function only retrieves the status
777  * from the OSP device, and the real statfs RPC happens inside
778  * precreate thread (\see osp_statfs_update). Note: OSP for MDT does
779  * not need to retrieve statfs data for now.
780  *
781  * \param[in] env       execution environment.
782  * \param[in] dev       dt_device of OSP.
783  * \param[out] sfs      holds the retrieved statfs data.
784  *
785  * \retval 0            0 statfs data was retrieved successfully or
786  *                      retrieval was not needed
787  * \retval negative     negative errno if get statfs failed.
788  */
789 static int osp_statfs(const struct lu_env *env, struct dt_device *dev,
790                       struct obd_statfs *sfs, struct obd_statfs_info *info)
791 {
792         struct osp_device *d = dt2osp_dev(dev);
793         struct obd_import *imp = d->opd_obd->u.cli.cl_import;
794
795         ENTRY;
796
797         if (imp->imp_state == LUSTRE_IMP_CLOSED)
798                 RETURN(-ESHUTDOWN);
799
800         if (unlikely(d->opd_imp_active == 0))
801                 RETURN(-ENOTCONN);
802
803         /* return recently updated data */
804         *sfs = d->opd_statfs;
805         if (info) {
806                 info->os_reserved_mb_low = d->opd_reserved_mb_low;
807                 info->os_reserved_mb_high = d->opd_reserved_mb_high;
808         }
809
810         CDEBUG(D_OTHER, "%s: %llu blocks, %llu free, %llu avail, "
811                "%u bsize, %u reserved mb low, %u reserved mb high, "
812                "%llu files, %llu free files\n", d->opd_obd->obd_name,
813                sfs->os_blocks, sfs->os_bfree, sfs->os_bavail, sfs->os_bsize,
814                d->opd_reserved_mb_low, d->opd_reserved_mb_high,
815                sfs->os_files, sfs->os_ffree);
816
817         if (d->opd_pre == NULL || (info && !info->os_enable_pre))
818                 RETURN(0);
819
820         /*
821          * The layer above osp (usually lod) can use f_precreated to
822          * estimate how many objects are available for immediate usage.
823          */
824         spin_lock(&d->opd_pre_lock);
825         sfs->os_fprecreated = osp_fid_diff(&d->opd_pre_last_created_fid,
826                                            &d->opd_pre_used_fid);
827         sfs->os_fprecreated -= d->opd_pre_reserved;
828         LASSERTF(sfs->os_fprecreated <= OST_MAX_PRECREATE * 2,
829                  "last_created "DFID", next_fid "DFID", reserved %llu\n",
830                  PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_pre_used_fid),
831                  d->opd_pre_reserved);
832         spin_unlock(&d->opd_pre_lock);
833         RETURN(0);
834 }
835
836 /**
837  * Implementation of dt_device_operations::dt_sync
838  *
839  * This function synchronizes the OSP cache to the remote target. It wakes
840  * up unlink log threads and sends out unlink records to the remote OST.
841  *
842  * \param[in] env       execution environment
843  * \param[in] dev       dt_device of OSP
844  *
845  * \retval 0            0 if synchronization succeeds
846  * \retval negative     negative errno if synchronization fails
847  */
848 static int osp_sync(const struct lu_env *env, struct dt_device *dev)
849 {
850         struct osp_device *d = dt2osp_dev(dev);
851         time64_t start = ktime_get_seconds();
852         int recs, rc = 0;
853         u64 old;
854
855         ENTRY;
856
857         /* No Sync between MDTs yet. */
858         if (d->opd_connect_mdt)
859                 RETURN(0);
860
861         recs = atomic_read(&d->opd_sync_changes);
862         old = atomic64_read(&d->opd_sync_processed_recs);
863
864         osp_sync_force(env, dt2osp_dev(dev));
865
866         if (unlikely(d->opd_imp_active == 0))
867                 RETURN(-ENOTCONN);
868
869         down_write(&d->opd_async_updates_rwsem);
870
871         CDEBUG(D_OTHER, "%s: async updates %d\n", d->opd_obd->obd_name,
872                atomic_read(&d->opd_async_updates_count));
873
874         /* make sure the connection is fine */
875         rc = wait_event_idle_timeout(
876                 d->opd_sync_barrier_waitq,
877                 atomic_read(&d->opd_async_updates_count) == 0,
878                 cfs_time_seconds(obd_timeout));
879         if (rc > 0)
880                 rc = 0;
881         else if (rc == 0)
882                 rc = -ETIMEDOUT;
883
884         up_write(&d->opd_async_updates_rwsem);
885         if (rc != 0)
886                 GOTO(out, rc);
887
888         CDEBUG(D_CACHE, "%s: processed %llu\n", d->opd_obd->obd_name,
889                (unsigned long long)atomic64_read(&d->opd_sync_processed_recs));
890
891         while (atomic64_read(&d->opd_sync_processed_recs) < old + recs) {
892                 __u64 last = atomic64_read(&d->opd_sync_processed_recs);
893                 /* make sure the connection is fine */
894                 wait_event_idle_timeout(
895                         d->opd_sync_barrier_waitq,
896                         atomic64_read(&d->opd_sync_processed_recs)
897                              >= old + recs,
898                         cfs_time_seconds(obd_timeout));
899
900                 if (atomic64_read(&d->opd_sync_processed_recs) >= old + recs)
901                         break;
902
903                 if (atomic64_read(&d->opd_sync_processed_recs) != last) {
904                         /* some progress have been made,
905                          * keep trying... */
906                         continue;
907                 }
908
909                 /* no changes and expired, something is wrong */
910                 GOTO(out, rc = -ETIMEDOUT);
911         }
912
913         /* block new processing (barrier>0 - few callers are possible */
914         atomic_inc(&d->opd_sync_barrier);
915
916         CDEBUG(D_CACHE, "%s: %u in flight\n", d->opd_obd->obd_name,
917                atomic_read(&d->opd_sync_rpcs_in_flight));
918
919         /* wait till all-in-flight are replied, so executed by the target */
920         /* XXX: this is used by LFSCK at the moment, which doesn't require
921          *      all the changes to be committed, but in general it'd be
922          *      better to wait till commit */
923         while (atomic_read(&d->opd_sync_rpcs_in_flight) > 0) {
924                 old = atomic_read(&d->opd_sync_rpcs_in_flight);
925
926                 wait_event_idle_timeout(
927                         d->opd_sync_barrier_waitq,
928                         atomic_read(&d->opd_sync_rpcs_in_flight) == 0,
929                         cfs_time_seconds(obd_timeout));
930
931                 if (atomic_read(&d->opd_sync_rpcs_in_flight) == 0)
932                         break;
933
934                 if (atomic_read(&d->opd_sync_rpcs_in_flight) != old) {
935                         /* some progress have been made */
936                         continue;
937                 }
938
939                 /* no changes and expired, something is wrong */
940                 GOTO(out, rc = -ETIMEDOUT);
941         }
942
943 out:
944         /* resume normal processing (barrier=0) */
945         atomic_dec(&d->opd_sync_barrier);
946         osp_sync_check_for_work(d);
947
948         CDEBUG(D_CACHE, "%s: done in %lld: rc = %d\n", d->opd_obd->obd_name,
949                ktime_get_seconds() - start, rc);
950
951         RETURN(rc);
952 }
953
954 static const struct dt_device_operations osp_dt_ops = {
955         .dt_statfs       = osp_statfs,
956         .dt_sync         = osp_sync,
957         .dt_trans_create = osp_trans_create,
958         .dt_trans_start  = osp_trans_start,
959         .dt_trans_stop   = osp_trans_stop,
960         .dt_trans_cb_add   = osp_trans_cb_add,
961 };
962
963 /**
964  * Connect OSP to local OSD.
965  *
966  * Locate the local OSD referenced by \a nextdev and connect to it. Sometimes,
967  * OSP needs to access the local OSD to store some information. For example,
968  * during precreate, it needs to update last used OID and sequence file
969  * (LAST_SEQ) in local OSD.
970  *
971  * \param[in] env       execution environment
972  * \param[in] osp       OSP device
973  * \param[in] nextdev   the name of local OSD
974  *
975  * \retval 0            0 connection succeeded
976  * \retval negative     negative errno connection failed
977  */
978 static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *osp,
979                               const char *nextdev)
980 {
981         struct obd_connect_data *data = NULL;
982         struct obd_device       *obd;
983         int                      rc;
984
985         ENTRY;
986
987         LASSERT(osp->opd_storage_exp == NULL);
988
989         OBD_ALLOC_PTR(data);
990         if (data == NULL)
991                 RETURN(-ENOMEM);
992
993         obd = class_name2obd(nextdev);
994         if (obd == NULL) {
995                 CERROR("%s: can't locate next device: %s\n",
996                        osp->opd_obd->obd_name, nextdev);
997                 GOTO(out, rc = -ENOTCONN);
998         }
999
1000         rc = obd_connect(env, &osp->opd_storage_exp, obd, &obd->obd_uuid, data,
1001                          NULL);
1002         if (rc) {
1003                 CERROR("%s: cannot connect to next dev %s: rc = %d\n",
1004                        osp->opd_obd->obd_name, nextdev, rc);
1005                 GOTO(out, rc);
1006         }
1007
1008         osp->opd_dt_dev.dd_lu_dev.ld_site =
1009                 osp->opd_storage_exp->exp_obd->obd_lu_dev->ld_site;
1010         LASSERT(osp->opd_dt_dev.dd_lu_dev.ld_site);
1011         osp->opd_storage = lu2dt_dev(osp->opd_storage_exp->exp_obd->obd_lu_dev);
1012
1013 out:
1014         OBD_FREE_PTR(data);
1015         RETURN(rc);
1016 }
1017
1018 /**
1019  * Determine if the lock needs to be cancelled
1020  *
1021  * Determine if the unused lock should be cancelled before replay, see
1022  * (ldlm_cancel_no_wait_policy()). Currently, only inode bits lock exists
1023  * between MDTs.
1024  *
1025  * \param[in] lock      lock to be checked.
1026  *
1027  * \retval              1 if the lock needs to be cancelled before replay.
1028  * \retval              0 if the lock does not need to be cancelled before
1029  *                      replay.
1030  */
1031 static int osp_cancel_weight(struct ldlm_lock *lock)
1032 {
1033         if (lock->l_resource->lr_type != LDLM_IBITS)
1034                 RETURN(0);
1035
1036         RETURN(1);
1037 }
1038
1039 /**
1040  * Initialize OSP device according to the parameters in the configuration
1041  * log \a cfg.
1042  *
1043  * Reconstruct the local device name from the configuration profile, and
1044  * initialize necessary threads and structures according to the OSP type
1045  * (MDT or OST).
1046  *
1047  * Since there is no record in the MDT configuration for the local disk
1048  * device, we have to extract this from elsewhere in the profile.
1049  * The only information we get at setup is from the OSC records:
1050  * setup 0:{fsname}-OSTxxxx-osc[-MDTxxxx] 1:lustre-OST0000_UUID 2:NID
1051  *
1052  * Note: configs generated by Lustre 1.8 are missing the -MDTxxxx part,
1053  * so, we need to reconstruct the name of the underlying OSD from this:
1054  * {fsname}-{svname}-osd, for example "lustre-MDT0000-osd".
1055  *
1056  * \param[in] env       execution environment
1057  * \param[in] osp       OSP device
1058  * \param[in] ldt       lu device type of OSP
1059  * \param[in] cfg       configuration log
1060  *
1061  * \retval 0            0 if OSP initialization succeeded.
1062  * \retval negative     negative errno if OSP initialization failed.
1063  */
1064 static int osp_init0(const struct lu_env *env, struct osp_device *osp,
1065                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
1066 {
1067         struct obd_device       *obd;
1068         struct obd_import       *imp;
1069         char *src, *tgt, *osdname = NULL;
1070         const char *mdt;
1071         int                     rc;
1072         u32 idx;
1073
1074         ENTRY;
1075
1076         mutex_init(&osp->opd_async_requests_mutex);
1077         INIT_LIST_HEAD(&osp->opd_async_updates);
1078         init_rwsem(&osp->opd_async_updates_rwsem);
1079         atomic_set(&osp->opd_async_updates_count, 0);
1080
1081         obd = class_name2obd(lustre_cfg_string(cfg, 0));
1082         if (obd == NULL) {
1083                 CERROR("Cannot find obd with name %s\n",
1084                        lustre_cfg_string(cfg, 0));
1085                 RETURN(-ENODEV);
1086         }
1087         osp->opd_obd = obd;
1088
1089         src = lustre_cfg_string(cfg, 0);
1090         if (src == NULL)
1091                 RETURN(-EINVAL);
1092
1093         tgt = strrchr(src, '-');
1094         if (tgt == NULL) {
1095                 CERROR("%s: invalid target name %s: rc = %d\n",
1096                        osp->opd_obd->obd_name, lustre_cfg_string(cfg, 0),
1097                        -EINVAL);
1098                 RETURN(-EINVAL);
1099         }
1100
1101         if (strncmp(tgt, "-osc", 4) == 0) {
1102                 /* Old OSC name fsname-OSTXXXX-osc */
1103                 for (tgt--; tgt > src && *tgt != '-'; tgt--)
1104                         ;
1105                 if (tgt == src) {
1106                         CERROR("%s: invalid target name %s: rc = %d\n",
1107                                osp->opd_obd->obd_name,
1108                                lustre_cfg_string(cfg, 0), -EINVAL);
1109                         RETURN(-EINVAL);
1110                 }
1111
1112                 if (strncmp(tgt, "-OST", 4) != 0) {
1113                         CERROR("%s: invalid target name %s: rc = %d\n",
1114                                osp->opd_obd->obd_name,
1115                                lustre_cfg_string(cfg, 0), -EINVAL);
1116                         RETURN(-EINVAL);
1117                 }
1118
1119                 rc = target_name2index(tgt + 1, &idx, &mdt);
1120                 if (rc < 0 || rc & LDD_F_SV_ALL || mdt[0] != '-') {
1121                         CERROR("%s: invalid OST index in '%s': rc = %d\n",
1122                                osp->opd_obd->obd_name, src, -EINVAL);
1123                         RETURN(-EINVAL);
1124                 }
1125                 osp->opd_index = idx;
1126                 osp->opd_group = 0;
1127                 idx = tgt - src;
1128         } else {
1129                 /* New OSC name fsname-OSTXXXX-osc-MDTXXXX */
1130                 if (strncmp(tgt, "-MDT", 4) != 0 &&
1131                     strncmp(tgt, "-OST", 4) != 0) {
1132                         CERROR("%s: invalid target name %s: rc = %d\n",
1133                                osp->opd_obd->obd_name,
1134                                lustre_cfg_string(cfg, 0), -EINVAL);
1135                         RETURN(-EINVAL);
1136                 }
1137
1138                 rc = target_name2index(tgt + 1, &idx, &mdt);
1139                 if (rc < 0 || rc & LDD_F_SV_ALL || *mdt != '\0') {
1140                         CERROR("%s: invalid OST index in '%s': rc = %d\n",
1141                                osp->opd_obd->obd_name, src, -EINVAL);
1142                         RETURN(-EINVAL);
1143                 }
1144
1145                 /* Get MDT index from the name and set it to opd_group,
1146                  * which will be used by OSP to connect with OST */
1147                 osp->opd_group = idx;
1148                 if (tgt - src <= 12) {
1149                         CERROR("%s: invalid mdt index from %s: rc =%d\n",
1150                                osp->opd_obd->obd_name,
1151                                lustre_cfg_string(cfg, 0), -EINVAL);
1152                         RETURN(-EINVAL);
1153                 }
1154
1155                 if (strncmp(tgt - 12, "-MDT", 4) == 0)
1156                         osp->opd_connect_mdt = 1;
1157
1158                 rc = target_name2index(tgt - 11, &idx, &mdt);
1159                 if (rc < 0 || rc & LDD_F_SV_ALL || mdt[0] != '-') {
1160                         CERROR("%s: invalid OST index in '%s': rc =%d\n",
1161                                osp->opd_obd->obd_name, src, -EINVAL);
1162                         RETURN(-EINVAL);
1163                 }
1164
1165                 osp->opd_index = idx;
1166                 idx = tgt - src - 12;
1167         }
1168         /* check the fsname length, and after this everything else will fit */
1169         if (idx > MTI_NAME_MAXLEN) {
1170                 CERROR("%s: fsname too long in '%s': rc = %d\n",
1171                        osp->opd_obd->obd_name, src, -EINVAL);
1172                 RETURN(-EINVAL);
1173         }
1174
1175         OBD_ALLOC(osdname, MAX_OBD_NAME);
1176         if (osdname == NULL)
1177                 RETURN(-ENOMEM);
1178
1179         memcpy(osdname, src, idx); /* copy just the fsname part */
1180         osdname[idx] = '\0';
1181
1182         mdt = strstr(mdt, "-MDT");
1183         if (mdt == NULL) /* 1.8 configs don't have "-MDT0000" at the end */
1184                 strcat(osdname, "-MDT0000");
1185         else
1186                 strcat(osdname, mdt);
1187         strcat(osdname, "-osd");
1188         CDEBUG(D_HA, "%s: connect to %s (%s)\n", obd->obd_name, osdname, src);
1189
1190         osp->opd_dt_dev.dd_lu_dev.ld_ops = &osp_lu_ops;
1191         osp->opd_dt_dev.dd_ops = &osp_dt_ops;
1192
1193         obd->obd_lu_dev = &osp->opd_dt_dev.dd_lu_dev;
1194
1195         rc = osp_connect_to_osd(env, osp, osdname);
1196         if (rc)
1197                 GOTO(out_fini, rc);
1198
1199         rc = ptlrpcd_addref();
1200         if (rc)
1201                 GOTO(out_disconnect, rc);
1202
1203         rc = client_obd_setup(obd, cfg);
1204         if (rc) {
1205                 CERROR("%s: can't setup obd: rc = %d\n", osp->opd_obd->obd_name,
1206                        rc);
1207                 GOTO(out_ref, rc);
1208         }
1209
1210         osp_tunables_init(osp);
1211
1212         rc = obd_fid_init(osp->opd_obd, NULL, osp->opd_connect_mdt ?
1213                           LUSTRE_SEQ_METADATA : LUSTRE_SEQ_DATA);
1214         if (rc) {
1215                 CERROR("%s: fid init error: rc = %d\n",
1216                        osp->opd_obd->obd_name, rc);
1217                 GOTO(out_proc, rc);
1218         }
1219
1220         if (!osp->opd_connect_mdt) {
1221                 /* Initialize last id from the storage - will be
1222                  * used in orphan cleanup. */
1223                 if (!osp->opd_storage->dd_rdonly) {
1224                         rc = osp_last_used_init(env, osp);
1225                         if (rc)
1226                                 GOTO(out_fid, rc);
1227                 }
1228
1229                 /* Initialize precreation thread, it handles new
1230                  * connections as well. */
1231                 rc = osp_init_precreate(osp);
1232                 if (rc)
1233                         GOTO(out_last_used, rc);
1234
1235                 /*
1236                  * Initialize synhronization mechanism taking
1237                  * care of propogating changes to OST in near
1238                  * transactional manner.
1239                  */
1240                 rc = osp_sync_init(env, osp);
1241                 if (rc < 0)
1242                         GOTO(out_precreat, rc);
1243         } else {
1244                 osp->opd_got_disconnected = 1;
1245                 rc = osp_update_init(osp);
1246                 if (rc != 0)
1247                         GOTO(out_fid, rc);
1248         }
1249
1250         rc = osp_init_statfs(osp);
1251         if (rc)
1252                 GOTO(out_precreat, rc);
1253
1254         ns_register_cancel(obd->obd_namespace, osp_cancel_weight);
1255
1256         /*
1257          * Initiate connect to OST
1258          */
1259         imp = obd->u.cli.cl_import;
1260
1261         rc = ptlrpc_init_import(imp);
1262         if (rc)
1263                 GOTO(out, rc);
1264         if (osdname)
1265                 OBD_FREE(osdname, MAX_OBD_NAME);
1266         init_waitqueue_head(&osp->opd_out_waitq);
1267         RETURN(0);
1268
1269 out:
1270         if (!osp->opd_connect_mdt)
1271                 /* stop sync thread */
1272                 osp_sync_fini(osp);
1273 out_precreat:
1274         /* stop precreate thread */
1275         if (!osp->opd_connect_mdt)
1276                 osp_precreate_fini(osp);
1277         else
1278                 osp_update_fini(env, osp);
1279 out_last_used:
1280         if (!osp->opd_connect_mdt)
1281                 osp_last_used_fini(env, osp);
1282 out_fid:
1283         obd_fid_fini(osp->opd_obd);
1284 out_proc:
1285         osp_tunables_fini(osp);
1286         client_obd_cleanup(obd);
1287 out_ref:
1288         ptlrpcd_decref();
1289 out_disconnect:
1290         obd_disconnect(osp->opd_storage_exp);
1291 out_fini:
1292         if (osdname)
1293                 OBD_FREE(osdname, MAX_OBD_NAME);
1294         RETURN(rc);
1295 }
1296
1297 /**
1298  * Implementation of lu_device_type_operations::ldto_device_free
1299  *
1300  * Free the OSP device in memory.  No return value is needed for now,
1301  * so always return NULL to comply with the interface.
1302  *
1303  * \param[in] env       execution environment
1304  * \param[in] lu        lu_device of OSP
1305  *
1306  * \retval NULL         NULL unconditionally
1307  */
1308 static struct lu_device *osp_device_free(const struct lu_env *env,
1309                                          struct lu_device *lu)
1310 {
1311         struct osp_device *osp = lu2osp_dev(lu);
1312
1313         lu_site_print(env, lu->ld_site, &lu->ld_ref, D_ERROR,
1314                       lu_cdebug_printer);
1315         dt_device_fini(&osp->opd_dt_dev);
1316         OBD_FREE_PTR(osp);
1317
1318         return NULL;
1319 }
1320
1321 /**
1322  * Implementation of lu_device_type_operations::ldto_device_alloc
1323  *
1324  * This function allocates and initializes OSP device in memory according to
1325  * the config log.
1326  *
1327  * \param[in] env       execution environment
1328  * \param[in] type      device type of OSP
1329  * \param[in] lcfg      config log
1330  *
1331  * \retval pointer              the pointer of allocated OSP if succeed.
1332  * \retval ERR_PTR(errno)       ERR_PTR(errno) if failed.
1333  */
1334 static struct lu_device *osp_device_alloc(const struct lu_env *env,
1335                                           struct lu_device_type *type,
1336                                           struct lustre_cfg *lcfg)
1337 {
1338         struct osp_device *osp;
1339         struct lu_device  *ld;
1340
1341         OBD_ALLOC_PTR(osp);
1342         if (osp == NULL) {
1343                 ld = ERR_PTR(-ENOMEM);
1344         } else {
1345                 int rc;
1346
1347                 ld = osp2lu_dev(osp);
1348                 dt_device_init(&osp->opd_dt_dev, type);
1349                 rc = osp_init0(env, osp, type, lcfg);
1350                 if (rc != 0) {
1351                         osp_device_free(env, ld);
1352                         ld = ERR_PTR(rc);
1353                 }
1354         }
1355         return ld;
1356 }
1357
1358 /**
1359  * Implementation of lu_device_type_operations::ldto_device_fini
1360  *
1361  * This function cleans up the OSP device, i.e. release and free those
1362  * attached items in osp_device.
1363  *
1364  * \param[in] env       execution environment
1365  * \param[in] ld        lu_device of OSP
1366  *
1367  * \retval NULL                 NULL if cleanup succeeded.
1368  * \retval ERR_PTR(errno)       ERR_PTR(errno) if cleanup failed.
1369  */
1370 static struct lu_device *osp_device_fini(const struct lu_env *env,
1371                                          struct lu_device *ld)
1372 {
1373         struct osp_device *osp = lu2osp_dev(ld);
1374         int                rc;
1375
1376         ENTRY;
1377
1378         if (osp->opd_async_requests != NULL) {
1379                 osp_update_request_destroy(env, osp->opd_async_requests);
1380                 osp->opd_async_requests = NULL;
1381         }
1382
1383         if (osp->opd_storage_exp) {
1384                 /* wait for the commit callbacks to complete */
1385                 wait_event(osp->opd_sync_waitq,
1386                           atomic_read(&osp->opd_commits_registered) == 0);
1387                 obd_disconnect(osp->opd_storage_exp);
1388         }
1389
1390         LASSERT(osp->opd_obd);
1391
1392         rc = client_obd_cleanup(osp->opd_obd);
1393         if (rc != 0) {
1394                 ptlrpcd_decref();
1395                 RETURN(ERR_PTR(rc));
1396         }
1397
1398         osp_tunables_fini(osp);
1399
1400         ptlrpcd_decref();
1401
1402         RETURN(NULL);
1403 }
1404
1405 /**
1406  * Implementation of obd_ops::o_reconnect
1407  *
1408  * This function is empty and does not need to do anything for now.
1409  */
1410 static int osp_reconnect(const struct lu_env *env,
1411                          struct obd_export *exp, struct obd_device *obd,
1412                          struct obd_uuid *cluuid,
1413                          struct obd_connect_data *data,
1414                          void *localdata)
1415 {
1416         return 0;
1417 }
1418
1419 /*
1420  * Implementation of obd_ops::o_connect
1421  *
1422  * Connect OSP to the remote target (MDT or OST). Allocate the
1423  * export and return it to the LOD, which calls this function
1424  * for each OSP to connect it to the remote target. This function
1425  * is currently only called once per OSP.
1426  *
1427  * \param[in] env       execution environment
1428  * \param[out] exp      export connected to OSP
1429  * \param[in] obd       OSP device
1430  * \param[in] cluuid    OSP device client uuid
1431  * \param[in] data      connect_data to be used to connect to the remote
1432  *                      target
1433  * \param[in] localdata necessary for the API interface, but not used in
1434  *                      this function
1435  *
1436  * \retval 0            0 if the connection succeeded.
1437  * \retval negative     negative errno if the connection failed.
1438  */
1439 static int osp_obd_connect(const struct lu_env *env, struct obd_export **exp,
1440                            struct obd_device *obd, struct obd_uuid *cluuid,
1441                            struct obd_connect_data *data, void *localdata)
1442 {
1443         struct osp_device *osp = lu2osp_dev(obd->obd_lu_dev);
1444         int rc;
1445
1446         ENTRY;
1447
1448         LASSERT(data != NULL);
1449         LASSERT(data->ocd_connect_flags & OBD_CONNECT_INDEX);
1450
1451         rc = client_connect_import(env, &osp->opd_exp, obd, cluuid, data,
1452                                    localdata);
1453         if (rc)
1454                 RETURN(rc);
1455
1456         *exp = osp->opd_exp;
1457
1458         osp->opd_obd->u.cli.cl_seq->lcs_exp = class_export_get(osp->opd_exp);
1459         /* precreate thread can be waiting for us to initialize fld client */
1460         wake_up(&osp->opd_pre_waitq);
1461
1462         RETURN(rc);
1463 }
1464
1465 /**
1466  * Implementation of obd_ops::o_disconnect
1467  *
1468  * Disconnect the export for the OSP.  This is called by LOD to release the
1469  * OSP during cleanup (\see lod_del_device()). The OSP will be released after
1470  * the export is released.
1471  *
1472  * \param[in] exp       export to be disconnected.
1473  *
1474  * \retval 0            0 if disconnection succeed
1475  * \retval negative     negative errno if disconnection failed
1476  */
1477 static int osp_obd_disconnect(struct obd_export *exp)
1478 {
1479         struct obd_device *obd = exp->exp_obd;
1480         int                rc;
1481         ENTRY;
1482
1483         rc = class_disconnect(exp);
1484         if (rc) {
1485                 CERROR("%s: class disconnect error: rc = %d\n",
1486                        obd->obd_name, rc);
1487                 RETURN(rc);
1488         }
1489
1490         /* destroy the device */
1491         class_manual_cleanup(obd);
1492
1493         RETURN(rc);
1494 }
1495
1496 /**
1497  * Implementation of obd_ops::o_statfs
1498  *
1499  * Send a RPC to the remote target to get statfs status. This is only used
1500  * in lprocfs helpers by obd_statfs.
1501  *
1502  * \param[in] env       execution environment
1503  * \param[in] exp       connection state from this OSP to the parent (LOD)
1504  *                      device
1505  * \param[out] osfs     hold the statfs result
1506  * \param[in] unused    Not used in this function for now
1507  * \param[in] flags     flags to indicate how OSP will issue the RPC
1508  *
1509  * \retval 0            0 if statfs succeeded.
1510  * \retval negative     negative errno if statfs failed.
1511  */
1512 static int osp_obd_statfs(const struct lu_env *env, struct obd_export *exp,
1513                           struct obd_statfs *osfs, time64_t unused, __u32 flags)
1514 {
1515         struct obd_statfs       *msfs;
1516         struct ptlrpc_request   *req;
1517         struct obd_import       *imp = NULL, *imp0;
1518         int                      rc;
1519
1520         ENTRY;
1521
1522         /* Since the request might also come from lprocfs, so we need
1523          * sync this with client_disconnect_export Bug15684
1524          */
1525         with_imp_locked(exp->exp_obd, imp0, rc)
1526                 imp = class_import_get(imp0);
1527         if (rc)
1528                 RETURN(rc);
1529
1530         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
1531
1532         class_import_put(imp);
1533
1534         if (req == NULL)
1535                 RETURN(-ENOMEM);
1536
1537         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
1538         if (rc) {
1539                 ptlrpc_request_free(req);
1540                 RETURN(rc);
1541         }
1542         ptlrpc_request_set_replen(req);
1543         req->rq_request_portal = OST_CREATE_PORTAL;
1544         ptlrpc_at_set_req_timeout(req);
1545
1546         if (flags & OBD_STATFS_NODELAY) {
1547                 /* procfs requests not want stat in wait for avoid deadlock */
1548                 req->rq_no_resend = 1;
1549                 req->rq_no_delay = 1;
1550         }
1551
1552         rc = ptlrpc_queue_wait(req);
1553         if (rc)
1554                 GOTO(out, rc);
1555
1556         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
1557         if (msfs == NULL)
1558                 GOTO(out, rc = -EPROTO);
1559
1560         *osfs = *msfs;
1561
1562         EXIT;
1563 out:
1564         ptlrpc_req_finished(req);
1565         return rc;
1566 }
1567
1568 /**
1569  * Implementation of obd_ops::o_import_event
1570  *
1571  * This function is called when some related import event happens. It will
1572  * mark the necessary flags according to the event and notify the necessary
1573  * threads (mainly precreate thread).
1574  *
1575  * \param[in] obd       OSP OBD device
1576  * \param[in] imp       import attached from OSP to remote (OST/MDT) service
1577  * \param[in] event     event related to remote service (IMP_EVENT_*)
1578  *
1579  * \retval 0            0 if the event handling succeeded.
1580  * \retval negative     negative errno if the event handling failed.
1581  */
1582 static int osp_import_event(struct obd_device *obd, struct obd_import *imp,
1583                             enum obd_import_event event)
1584 {
1585         struct osp_device *d = lu2osp_dev(obd->obd_lu_dev);
1586         int rc;
1587
1588         switch (event) {
1589         case IMP_EVENT_DISCON:
1590                 d->opd_got_disconnected = 1;
1591                 d->opd_imp_connected = 0;
1592                 if (d->opd_connect_mdt)
1593                         break;
1594
1595                 if (d->opd_pre != NULL) {
1596                         osp_pre_update_status(d, -ENODEV);
1597                         wake_up(&d->opd_pre_waitq);
1598                 }
1599
1600                 CDEBUG(D_HA, "got disconnected\n");
1601                 break;
1602         case IMP_EVENT_INACTIVE:
1603                 d->opd_imp_active = 0;
1604                 d->opd_imp_connected = 0;
1605                 d->opd_obd->obd_inactive = 1;
1606                 if (d->opd_connect_mdt)
1607                         break;
1608                 if (d->opd_pre != NULL) {
1609                         /* Import is invalid, we can`t get stripes so
1610                          * wakeup waiters */
1611                         rc = imp->imp_deactive ? -ESHUTDOWN : -ENODEV;
1612                         osp_pre_update_status(d, rc);
1613                         wake_up(&d->opd_pre_waitq);
1614                 }
1615
1616                 CDEBUG(D_HA, "got inactive\n");
1617                 break;
1618         case IMP_EVENT_ACTIVE:
1619                 d->opd_imp_active = 1;
1620
1621                 d->opd_new_connection = 1;
1622                 d->opd_imp_connected = 1;
1623                 d->opd_imp_seen_connected = 1;
1624                 d->opd_obd->obd_inactive = 0;
1625                 wake_up(&d->opd_pre_waitq);
1626                 if (d->opd_connect_mdt)
1627                         break;
1628
1629                 osp_sync_check_for_work(d);
1630                 CDEBUG(D_HA, "got connected\n");
1631                 break;
1632         case IMP_EVENT_INVALIDATE:
1633                 if (d->opd_connect_mdt)
1634                         osp_invalidate_request(d);
1635
1636                 if (obd->obd_namespace == NULL)
1637                         break;
1638                 ldlm_namespace_cleanup(obd->obd_namespace, LDLM_FL_LOCAL_ONLY);
1639                 break;
1640         case IMP_EVENT_OCD:
1641         case IMP_EVENT_DEACTIVATE:
1642         case IMP_EVENT_ACTIVATE:
1643                 break;
1644         default:
1645                 CERROR("%s: unsupported import event: %#x\n",
1646                        obd->obd_name, event);
1647         }
1648         return 0;
1649 }
1650
1651 /**
1652  * Implementation of obd_ops: o_iocontrol
1653  *
1654  * This function is the ioctl handler for OSP. Note: lctl will access the OSP
1655  * directly by ioctl, instead of through the MDS stack.
1656  *
1657  * param[in] cmd        ioctl command.
1658  * param[in] exp        export of this OSP.
1659  * param[in] len        data length of \a karg.
1660  * param[in] karg       input argument which is packed as
1661  *                      obd_ioctl_data
1662  * param[out] uarg      pointer to userspace buffer (must access by
1663  *                      copy_to_user()).
1664  *
1665  * \retval 0            0 if the ioctl handling succeeded.
1666  * \retval negative     negative errno if the ioctl handling failed.
1667  */
1668 static int osp_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1669                          void *karg, void __user *uarg)
1670 {
1671         struct obd_device *obd = exp->exp_obd;
1672         struct osp_device *d;
1673         struct obd_ioctl_data *data;
1674         int rc = -EINVAL;
1675
1676         ENTRY;
1677         CDEBUG(D_IOCTL, "%s: cmd=%x len=%u karg=%pK uarg=%pK\n",
1678                exp->exp_obd->obd_name, cmd, len, karg, uarg);
1679         if (unlikely(karg == NULL)) {
1680                 CERROR("%s: iocontrol from '%s' cmd=%x karg=NULL: rc = %d\n",
1681                        obd->obd_name, current->comm, cmd, rc);
1682                 RETURN(rc);
1683         }
1684         data = karg;
1685
1686         LASSERT(obd->obd_lu_dev);
1687         d = lu2osp_dev(obd->obd_lu_dev);
1688         LASSERT(d->opd_dt_dev.dd_ops == &osp_dt_ops);
1689
1690         if (!try_module_get(THIS_MODULE)) {
1691                 CERROR("%s: cannot get module '%s'\n", obd->obd_name,
1692                        module_name(THIS_MODULE));
1693                 return -EINVAL;
1694         }
1695
1696         switch (cmd) {
1697         case OBD_IOC_CLIENT_RECOVER:
1698                 rc = ptlrpc_recover_import(obd->u.cli.cl_import,
1699                                            data->ioc_inlbuf1, 0);
1700                 if (rc > 0)
1701                         rc = 0;
1702                 break;
1703 #ifdef IOC_OSC_SET_ACTIVE
1704         case_OBD_IOC_DEPRECATED_FT(IOC_OSC_SET_ACTIVE, obd->obd_name, 2, 17);
1705 #endif
1706         case OBD_IOC_SET_ACTIVE:
1707                 rc = ptlrpc_set_import_active(obd->u.cli.cl_import,
1708                                               data->ioc_offset);
1709                 break;
1710         default:
1711                 rc = OBD_IOC_ERROR(obd->obd_name, cmd, "unrecognized", -ENOTTY);
1712                 break;
1713         }
1714         module_put(THIS_MODULE);
1715         return rc;
1716 }
1717
1718 /**
1719  * Implementation of obd_ops::o_get_info
1720  *
1721  * Retrieve information by key. Retrieval starts from the top layer
1722  * (MDT) of the MDS stack and traverses the stack by calling the
1723  * obd_get_info() method of the next sub-layer.
1724  *
1725  * \param[in] env       execution environment
1726  * \param[in] exp       export of this OSP
1727  * \param[in] keylen    length of \a key
1728  * \param[in] key       the key
1729  * \param[out] vallen   length of \a val
1730  * \param[out] val      holds the value returned by the key
1731  *
1732  * \retval 0            0 if getting information succeeded.
1733  * \retval negative     negative errno if getting information failed.
1734  */
1735 static int osp_obd_get_info(const struct lu_env *env, struct obd_export *exp,
1736                             __u32 keylen, void *key, __u32 *vallen, void *val)
1737 {
1738         int rc = -EINVAL;
1739
1740         if (KEY_IS(KEY_OSP_CONNECTED)) {
1741                 struct obd_device       *obd = exp->exp_obd;
1742                 struct osp_device       *osp;
1743
1744                 if (!obd->obd_set_up || obd->obd_stopping)
1745                         RETURN(-EAGAIN);
1746
1747                 osp = lu2osp_dev(obd->obd_lu_dev);
1748                 LASSERT(osp);
1749                 /*
1750                  * 1.8/2.0 behaviour is that OST being connected once at least
1751                  * is considered "healthy". and one "healthy" OST is enough to
1752                  * allow lustre clients to connect to MDS
1753                  */
1754                 RETURN(!osp->opd_imp_seen_connected);
1755         }
1756
1757         RETURN(rc);
1758 }
1759
1760 static int osp_obd_set_info_async(const struct lu_env *env,
1761                                   struct obd_export *exp,
1762                                   u32 keylen, void *key,
1763                                   u32 vallen, void *val,
1764                                   struct ptlrpc_request_set *set)
1765 {
1766         struct obd_device       *obd = exp->exp_obd;
1767         struct obd_import       *imp = obd->u.cli.cl_import;
1768         struct osp_device       *osp;
1769         struct ptlrpc_request   *req;
1770         char                    *tmp;
1771         int                      rc;
1772
1773         if (KEY_IS(KEY_SPTLRPC_CONF)) {
1774                 sptlrpc_conf_client_adapt(exp->exp_obd);
1775                 RETURN(0);
1776         }
1777
1778         LASSERT(set != NULL);
1779         if (!obd->obd_set_up || obd->obd_stopping)
1780                 RETURN(-EAGAIN);
1781         osp = lu2osp_dev(obd->obd_lu_dev);
1782
1783         req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
1784         if (req == NULL)
1785                 RETURN(-ENOMEM);
1786
1787         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
1788                              RCL_CLIENT, keylen);
1789         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
1790                              RCL_CLIENT, vallen);
1791         if (osp->opd_connect_mdt)
1792                 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, MDS_SET_INFO);
1793         else
1794                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
1795         if (rc) {
1796                 ptlrpc_request_free(req);
1797                 RETURN(rc);
1798         }
1799
1800         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1801         memcpy(tmp, key, keylen);
1802         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
1803         memcpy(tmp, val, vallen);
1804
1805         ptlrpc_request_set_replen(req);
1806         ptlrpc_set_add_req(set, req);
1807         ptlrpc_check_set(NULL, set);
1808
1809         RETURN(0);
1810 }
1811
1812 /* context key constructor/destructor: mdt_key_init, mdt_key_fini */
1813 LU_KEY_INIT_FINI(osp, struct osp_thread_info);
1814 static void osp_key_exit(const struct lu_context *ctx,
1815                          struct lu_context_key *key, void *data)
1816 {
1817         struct osp_thread_info *info = data;
1818
1819         info->osi_attr.la_valid = 0;
1820 }
1821
1822 struct lu_context_key osp_thread_key = {
1823         .lct_tags = LCT_MD_THREAD,
1824         .lct_init = osp_key_init,
1825         .lct_fini = osp_key_fini,
1826         .lct_exit = osp_key_exit
1827 };
1828
1829 /* context key constructor/destructor: mdt_txn_key_init, mdt_txn_key_fini */
1830 LU_KEY_INIT_FINI(osp_txn, struct osp_txn_info);
1831
1832 struct lu_context_key osp_txn_key = {
1833         .lct_tags = LCT_OSP_THREAD,
1834         .lct_init = osp_txn_key_init,
1835         .lct_fini = osp_txn_key_fini
1836 };
1837 LU_TYPE_INIT_FINI(osp, &osp_thread_key, &osp_txn_key);
1838
1839 static const struct lu_device_type_operations osp_device_type_ops = {
1840         .ldto_init           = osp_type_init,
1841         .ldto_fini           = osp_type_fini,
1842
1843         .ldto_start          = osp_type_start,
1844         .ldto_stop           = osp_type_stop,
1845
1846         .ldto_device_alloc   = osp_device_alloc,
1847         .ldto_device_free    = osp_device_free,
1848
1849         .ldto_device_fini    = osp_device_fini
1850 };
1851
1852 static struct lu_device_type osp_device_type = {
1853         .ldt_tags     = LU_DEVICE_DT,
1854         .ldt_name     = LUSTRE_OSP_NAME,
1855         .ldt_ops      = &osp_device_type_ops,
1856         .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
1857 };
1858
1859 static const struct obd_ops osp_obd_device_ops = {
1860         .o_owner        = THIS_MODULE,
1861         .o_add_conn     = client_import_add_conn,
1862         .o_del_conn     = client_import_del_conn,
1863         .o_reconnect    = osp_reconnect,
1864         .o_connect      = osp_obd_connect,
1865         .o_disconnect   = osp_obd_disconnect,
1866         .o_get_info     = osp_obd_get_info,
1867         .o_set_info_async = osp_obd_set_info_async,
1868         .o_import_event = osp_import_event,
1869         .o_iocontrol    = osp_iocontrol,
1870         .o_statfs       = osp_obd_statfs,
1871         .o_fid_init     = client_fid_init,
1872         .o_fid_fini     = client_fid_fini,
1873 };
1874
1875 /**
1876  * Initialize OSP module.
1877  *
1878  * Register device types OSP and Light Weight Proxy (LWP) (\see lwp_dev.c)
1879  * in obd_types (\see class_obd.c).  Initialize procfs for the
1880  * the OSP device.  Note: OSP was called OSC before Lustre 2.4,
1881  * so for compatibility it still uses the name "osc" in procfs.
1882  * This is called at module load time.
1883  *
1884  * \retval 0            0 if initialization succeeds.
1885  * \retval negative     negative errno if initialization failed.
1886  */
1887 static int __init osp_init(void)
1888 {
1889         struct obd_type *sym;
1890         int rc;
1891
1892         rc = lu_kmem_init(osp_caches);
1893         if (rc)
1894                 return rc;
1895
1896         rc = class_register_type(&osp_obd_device_ops, NULL, false,
1897                                  LUSTRE_OSP_NAME, &osp_device_type);
1898         if (rc != 0) {
1899                 lu_kmem_fini(osp_caches);
1900                 return rc;
1901         }
1902
1903         rc = class_register_type(&lwp_obd_device_ops, NULL, false,
1904                                  LUSTRE_LWP_NAME, &lwp_device_type);
1905         if (rc != 0) {
1906                 class_unregister_type(LUSTRE_OSP_NAME);
1907                 lu_kmem_fini(osp_caches);
1908                 return rc;
1909         }
1910
1911         /* create "osc" entry for compatibility purposes */
1912         sym = class_add_symlinks(LUSTRE_OSC_NAME, true);
1913         if (IS_ERR(sym)) {
1914                 rc = PTR_ERR(sym);
1915                 /* does real "osc" already exist ? */
1916                 if (rc == -EEXIST)
1917                         rc = 0;
1918         }
1919
1920         return rc;
1921 }
1922
1923 /**
1924  * Finalize OSP module.
1925  *
1926  * This callback is called when kernel unloads OSP module from memory, and
1927  * it will deregister OSP and LWP device type from obd_types (\see class_obd.c).
1928  */
1929 static void __exit osp_exit(void)
1930 {
1931         struct obd_type *sym = class_search_type(LUSTRE_OSC_NAME);
1932
1933         /* if this was never fully initialized by the osc layer
1934          * then we are responsible for freeing this obd_type
1935          */
1936         if (sym) {
1937                 /* final put if we manage this obd type */
1938                 if (sym->typ_sym_filter)
1939                         kobject_put(&sym->typ_kobj);
1940                 /* put reference taken by class_search_type */
1941                 kobject_put(&sym->typ_kobj);
1942         }
1943
1944         class_unregister_type(LUSTRE_LWP_NAME);
1945         class_unregister_type(LUSTRE_OSP_NAME);
1946         lu_kmem_fini(osp_caches);
1947 }
1948
1949 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
1950 MODULE_DESCRIPTION("Lustre OSD Storage Proxy ("LUSTRE_OSP_NAME")");
1951 MODULE_VERSION(LUSTRE_VERSION_STRING);
1952 MODULE_LICENSE("GPL");
1953
1954 module_init(osp_init);
1955 module_exit(osp_exit);