Whamcloud - gitweb
1ef96375366843af5e1afb5173616f23e6b6bf2a
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         if (type->typ_md_ops)
176                 OBD_FREE_PTR(type->typ_md_ops);
177         if (type->typ_dt_ops)
178                 OBD_FREE_PTR(type->typ_dt_ops);
179
180         OBD_FREE(type, sizeof(*type));
181 }
182
183 static struct kobj_type class_ktype = {
184         .sysfs_ops      = &lustre_sysfs_ops,
185         .release        = class_sysfs_release,
186 };
187
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
190 {
191         struct dentry *symlink;
192         struct obd_type *type;
193         int rc;
194
195         type = class_search_type(name);
196         if (type) {
197                 kobject_put(&type->typ_kobj);
198                 return ERR_PTR(-EEXIST);
199         }
200
201         OBD_ALLOC(type, sizeof(*type));
202         if (!type)
203                 return ERR_PTR(-ENOMEM);
204
205         type->typ_kobj.kset = lustre_kset;
206         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207                                   &lustre_kset->kobj, "%s", name);
208         if (rc)
209                 return ERR_PTR(rc);
210
211         symlink = debugfs_create_dir(name, debugfs_lustre_root);
212         if (IS_ERR_OR_NULL(symlink)) {
213                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214                 kobject_put(&type->typ_kobj);
215                 return ERR_PTR(rc);
216         }
217         type->typ_debugfs_entry = symlink;
218         type->typ_sym_filter = true;
219
220         if (enable_proc) {
221                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
222                                                       NULL, NULL);
223                 if (IS_ERR(type->typ_procroot)) {
224                         CERROR("%s: can't create compat proc entry: %d\n",
225                                name, (int)PTR_ERR(type->typ_procroot));
226                         type->typ_procroot = NULL;
227                 }
228         }
229
230         return type;
231 }
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
234
235 #define CLASS_MAX_NAME 1024
236
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238                         bool enable_proc, struct lprocfs_vars *vars,
239                         const char *name, struct lu_device_type *ldt)
240 {
241         struct obd_type *type;
242         int rc;
243
244         ENTRY;
245         /* sanity check */
246         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
247
248         type = class_search_type(name);
249         if (type) {
250 #ifdef HAVE_SERVER_SUPPORT
251                 if (type->typ_sym_filter)
252                         goto dir_exist;
253 #endif /* HAVE_SERVER_SUPPORT */
254                 kobject_put(&type->typ_kobj);
255                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
256                 RETURN(-EEXIST);
257         }
258
259         OBD_ALLOC(type, sizeof(*type));
260         if (type == NULL)
261                 RETURN(-ENOMEM);
262
263         type->typ_kobj.kset = lustre_kset;
264         kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
266 dir_exist:
267 #endif /* HAVE_SERVER_SUPPORT */
268         OBD_ALLOC_PTR(type->typ_dt_ops);
269         OBD_ALLOC_PTR(type->typ_md_ops);
270
271         if (type->typ_dt_ops == NULL ||
272             type->typ_md_ops == NULL)
273                 GOTO (failed, rc = -ENOMEM);
274
275         *(type->typ_dt_ops) = *dt_ops;
276         /* md_ops is optional */
277         if (md_ops)
278                 *(type->typ_md_ops) = *md_ops;
279
280 #ifdef HAVE_SERVER_SUPPORT
281         if (type->typ_sym_filter) {
282                 type->typ_sym_filter = false;
283                 kobject_put(&type->typ_kobj);
284                 goto setup_ldt;
285         }
286 #endif
287 #ifdef CONFIG_PROC_FS
288         if (enable_proc && !type->typ_procroot) {
289                 type->typ_procroot = lprocfs_register(name,
290                                                       proc_lustre_root,
291                                                       NULL, type);
292                 if (IS_ERR(type->typ_procroot)) {
293                         rc = PTR_ERR(type->typ_procroot);
294                         type->typ_procroot = NULL;
295                         GOTO(failed, rc);
296                 }
297         }
298 #endif
299         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
300                                                     vars, type);
301         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
302                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
303                                              : -ENOMEM;
304                 type->typ_debugfs_entry = NULL;
305                 GOTO(failed, rc);
306         }
307
308         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
309         if (rc)
310                 GOTO(failed, rc);
311 #ifdef HAVE_SERVER_SUPPORT
312 setup_ldt:
313 #endif
314         if (ldt) {
315                 type->typ_lu = ldt;
316                 rc = lu_device_type_init(ldt);
317                 if (rc)
318                         GOTO(failed, rc);
319         }
320
321         RETURN(0);
322
323 failed:
324         kobject_put(&type->typ_kobj);
325
326         RETURN(rc);
327 }
328 EXPORT_SYMBOL(class_register_type);
329
330 int class_unregister_type(const char *name)
331 {
332         struct obd_type *type = class_search_type(name);
333         int rc = 0;
334         ENTRY;
335
336         if (!type) {
337                 CERROR("unknown obd type\n");
338                 RETURN(-EINVAL);
339         }
340
341         if (atomic_read(&type->typ_refcnt)) {
342                 CERROR("type %s has refcount (%d)\n", name,
343                        atomic_read(&type->typ_refcnt));
344                 /* This is a bad situation, let's make the best of it */
345                 /* Remove ops, but leave the name for debugging */
346                 OBD_FREE_PTR(type->typ_dt_ops);
347                 OBD_FREE_PTR(type->typ_md_ops);
348                 GOTO(out_put, rc = -EBUSY);
349         }
350
351         /* Put the final ref */
352         kobject_put(&type->typ_kobj);
353 out_put:
354         /* Put the ref returned by class_search_type() */
355         kobject_put(&type->typ_kobj);
356
357         RETURN(rc);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
360
361 /**
362  * Create a new obd device.
363  *
364  * Allocate the new obd_device and initialize it.
365  *
366  * \param[in] type_name obd device type string.
367  * \param[in] name      obd device name.
368  * \param[in] uuid      obd device UUID
369  *
370  * \retval newdev         pointer to created obd_device
371  * \retval ERR_PTR(errno) on error
372  */
373 struct obd_device *class_newdev(const char *type_name, const char *name,
374                                 const char *uuid)
375 {
376         struct obd_device *newdev;
377         struct obd_type *type = NULL;
378         ENTRY;
379
380         if (strlen(name) >= MAX_OBD_NAME) {
381                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382                 RETURN(ERR_PTR(-EINVAL));
383         }
384
385         type = class_get_type(type_name);
386         if (type == NULL){
387                 CERROR("OBD: unknown type: %s\n", type_name);
388                 RETURN(ERR_PTR(-ENODEV));
389         }
390
391         newdev = obd_device_alloc();
392         if (newdev == NULL) {
393                 class_put_type(type);
394                 RETURN(ERR_PTR(-ENOMEM));
395         }
396         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398         newdev->obd_type = type;
399         newdev->obd_minor = -1;
400
401         rwlock_init(&newdev->obd_pool_lock);
402         newdev->obd_pool_limit = 0;
403         newdev->obd_pool_slv = 0;
404
405         INIT_LIST_HEAD(&newdev->obd_exports);
406         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408         INIT_LIST_HEAD(&newdev->obd_exports_timed);
409         INIT_LIST_HEAD(&newdev->obd_nid_stats);
410         spin_lock_init(&newdev->obd_nid_lock);
411         spin_lock_init(&newdev->obd_dev_lock);
412         mutex_init(&newdev->obd_dev_mutex);
413         spin_lock_init(&newdev->obd_osfs_lock);
414         /* newdev->obd_osfs_age must be set to a value in the distant
415          * past to guarantee a fresh statfs is fetched on mount. */
416         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
417
418         /* XXX belongs in setup not attach  */
419         init_rwsem(&newdev->obd_observer_link_sem);
420         /* recovery data */
421         spin_lock_init(&newdev->obd_recovery_task_lock);
422         init_waitqueue_head(&newdev->obd_next_transno_waitq);
423         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427         INIT_LIST_HEAD(&newdev->obd_evict_list);
428         INIT_LIST_HEAD(&newdev->obd_lwp_list);
429
430         llog_group_init(&newdev->obd_olg);
431         /* Detach drops this */
432         atomic_set(&newdev->obd_refcount, 1);
433         lu_ref_init(&newdev->obd_reference);
434         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
435
436         newdev->obd_conn_inprogress = 0;
437
438         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
439
440         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441                newdev->obd_name, newdev);
442
443         return newdev;
444 }
445
446 /**
447  * Free obd device.
448  *
449  * \param[in] obd obd_device to be freed
450  *
451  * \retval none
452  */
453 void class_free_dev(struct obd_device *obd)
454 {
455         struct obd_type *obd_type = obd->obd_type;
456
457         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460                  "obd %p != obd_devs[%d] %p\n",
461                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463                  "obd_refcount should be 0, not %d\n",
464                  atomic_read(&obd->obd_refcount));
465         LASSERT(obd_type != NULL);
466
467         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468                obd->obd_name, obd->obd_type->typ_name);
469
470         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471                          obd->obd_name, obd->obd_uuid.uuid);
472         if (obd->obd_stopping) {
473                 int err;
474
475                 /* If we're not stopping, we were never set up */
476                 err = obd_cleanup(obd);
477                 if (err)
478                         CERROR("Cleanup %s returned %d\n",
479                                 obd->obd_name, err);
480         }
481
482         obd_device_free(obd);
483
484         class_put_type(obd_type);
485 }
486
487 /**
488  * Unregister obd device.
489  *
490  * Free slot in obd_dev[] used by \a obd.
491  *
492  * \param[in] new_obd obd_device to be unregistered
493  *
494  * \retval none
495  */
496 void class_unregister_device(struct obd_device *obd)
497 {
498         write_lock(&obd_dev_lock);
499         if (obd->obd_minor >= 0) {
500                 LASSERT(obd_devs[obd->obd_minor] == obd);
501                 obd_devs[obd->obd_minor] = NULL;
502                 obd->obd_minor = -1;
503         }
504         write_unlock(&obd_dev_lock);
505 }
506
507 /**
508  * Register obd device.
509  *
510  * Find free slot in obd_devs[], fills it with \a new_obd.
511  *
512  * \param[in] new_obd obd_device to be registered
513  *
514  * \retval 0          success
515  * \retval -EEXIST    device with this name is registered
516  * \retval -EOVERFLOW obd_devs[] is full
517  */
518 int class_register_device(struct obd_device *new_obd)
519 {
520         int ret = 0;
521         int i;
522         int new_obd_minor = 0;
523         bool minor_assign = false;
524         bool retried = false;
525
526 again:
527         write_lock(&obd_dev_lock);
528         for (i = 0; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530
531                 if (obd != NULL &&
532                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
533
534                         if (!retried) {
535                                 write_unlock(&obd_dev_lock);
536
537                                 /* the obd_device could be waited to be
538                                  * destroyed by the "obd_zombie_impexp_thread".
539                                  */
540                                 obd_zombie_barrier();
541                                 retried = true;
542                                 goto again;
543                         }
544
545                         CERROR("%s: already exists, won't add\n",
546                                obd->obd_name);
547                         /* in case we found a free slot before duplicate */
548                         minor_assign = false;
549                         ret = -EEXIST;
550                         break;
551                 }
552                 if (!minor_assign && obd == NULL) {
553                         new_obd_minor = i;
554                         minor_assign = true;
555                 }
556         }
557
558         if (minor_assign) {
559                 new_obd->obd_minor = new_obd_minor;
560                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562                 obd_devs[new_obd_minor] = new_obd;
563         } else {
564                 if (ret == 0) {
565                         ret = -EOVERFLOW;
566                         CERROR("%s: all %u/%u devices used, increase "
567                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568                                i, class_devno_max(), ret);
569                 }
570         }
571         write_unlock(&obd_dev_lock);
572
573         RETURN(ret);
574 }
575
576 static int class_name2dev_nolock(const char *name)
577 {
578         int i;
579
580         if (!name)
581                 return -1;
582
583         for (i = 0; i < class_devno_max(); i++) {
584                 struct obd_device *obd = class_num2obd(i);
585
586                 if (obd && strcmp(name, obd->obd_name) == 0) {
587                         /* Make sure we finished attaching before we give
588                            out any references */
589                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590                         if (obd->obd_attached) {
591                                 return i;
592                         }
593                         break;
594                 }
595         }
596
597         return -1;
598 }
599
600 int class_name2dev(const char *name)
601 {
602         int i;
603
604         if (!name)
605                 return -1;
606
607         read_lock(&obd_dev_lock);
608         i = class_name2dev_nolock(name);
609         read_unlock(&obd_dev_lock);
610
611         return i;
612 }
613 EXPORT_SYMBOL(class_name2dev);
614
615 struct obd_device *class_name2obd(const char *name)
616 {
617         int dev = class_name2dev(name);
618
619         if (dev < 0 || dev > class_devno_max())
620                 return NULL;
621         return class_num2obd(dev);
622 }
623 EXPORT_SYMBOL(class_name2obd);
624
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
626 {
627         int i;
628
629         for (i = 0; i < class_devno_max(); i++) {
630                 struct obd_device *obd = class_num2obd(i);
631
632                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
634                         return i;
635                 }
636         }
637
638         return -1;
639 }
640
641 int class_uuid2dev(struct obd_uuid *uuid)
642 {
643         int i;
644
645         read_lock(&obd_dev_lock);
646         i = class_uuid2dev_nolock(uuid);
647         read_unlock(&obd_dev_lock);
648
649         return i;
650 }
651 EXPORT_SYMBOL(class_uuid2dev);
652
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
654 {
655         int dev = class_uuid2dev(uuid);
656         if (dev < 0)
657                 return NULL;
658         return class_num2obd(dev);
659 }
660 EXPORT_SYMBOL(class_uuid2obd);
661
662 /**
663  * Get obd device from ::obd_devs[]
664  *
665  * \param num [in] array index
666  *
667  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668  *         otherwise return the obd device there.
669  */
670 struct obd_device *class_num2obd(int num)
671 {
672         struct obd_device *obd = NULL;
673
674         if (num < class_devno_max()) {
675                 obd = obd_devs[num];
676                 if (obd == NULL)
677                         return NULL;
678
679                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680                          "%p obd_magic %08x != %08x\n",
681                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682                 LASSERTF(obd->obd_minor == num,
683                          "%p obd_minor %0d != %0d\n",
684                          obd, obd->obd_minor, num);
685         }
686
687         return obd;
688 }
689
690 /**
691  * Find obd in obd_dev[] by name or uuid.
692  *
693  * Increment obd's refcount if found.
694  *
695  * \param[in] str obd name or uuid
696  *
697  * \retval NULL    if not found
698  * \retval target  pointer to found obd_device
699  */
700 struct obd_device *class_dev_by_str(const char *str)
701 {
702         struct obd_device *target = NULL;
703         struct obd_uuid tgtuuid;
704         int rc;
705
706         obd_str2uuid(&tgtuuid, str);
707
708         read_lock(&obd_dev_lock);
709         rc = class_uuid2dev_nolock(&tgtuuid);
710         if (rc < 0)
711                 rc = class_name2dev_nolock(str);
712
713         if (rc >= 0)
714                 target = class_num2obd(rc);
715
716         if (target != NULL)
717                 class_incref(target, "find", current);
718         read_unlock(&obd_dev_lock);
719
720         RETURN(target);
721 }
722 EXPORT_SYMBOL(class_dev_by_str);
723
724 /**
725  * Get obd devices count. Device in any
726  *    state are counted
727  * \retval obd device count
728  */
729 int get_devices_count(void)
730 {
731         int index, max_index = class_devno_max(), dev_count = 0;
732
733         read_lock(&obd_dev_lock);
734         for (index = 0; index <= max_index; index++) {
735                 struct obd_device *obd = class_num2obd(index);
736                 if (obd != NULL)
737                         dev_count++;
738         }
739         read_unlock(&obd_dev_lock);
740
741         return dev_count;
742 }
743 EXPORT_SYMBOL(get_devices_count);
744
745 void class_obd_list(void)
746 {
747         char *status;
748         int i;
749
750         read_lock(&obd_dev_lock);
751         for (i = 0; i < class_devno_max(); i++) {
752                 struct obd_device *obd = class_num2obd(i);
753
754                 if (obd == NULL)
755                         continue;
756                 if (obd->obd_stopping)
757                         status = "ST";
758                 else if (obd->obd_set_up)
759                         status = "UP";
760                 else if (obd->obd_attached)
761                         status = "AT";
762                 else
763                         status = "--";
764                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765                          i, status, obd->obd_type->typ_name,
766                          obd->obd_name, obd->obd_uuid.uuid,
767                          atomic_read(&obd->obd_refcount));
768         }
769         read_unlock(&obd_dev_lock);
770 }
771
772 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
773    specified, then only the client with that uuid is returned,
774    otherwise any client connected to the tgt is returned. */
775 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
776                                           const char *type_name,
777                                           struct obd_uuid *grp_uuid)
778 {
779         int i;
780
781         read_lock(&obd_dev_lock);
782         for (i = 0; i < class_devno_max(); i++) {
783                 struct obd_device *obd = class_num2obd(i);
784
785                 if (obd == NULL)
786                         continue;
787                 if ((strncmp(obd->obd_type->typ_name, type_name,
788                              strlen(type_name)) == 0)) {
789                         if (obd_uuid_equals(tgt_uuid,
790                                             &obd->u.cli.cl_target_uuid) &&
791                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
792                                                          &obd->obd_uuid) : 1)) {
793                                 read_unlock(&obd_dev_lock);
794                                 return obd;
795                         }
796                 }
797         }
798         read_unlock(&obd_dev_lock);
799
800         return NULL;
801 }
802 EXPORT_SYMBOL(class_find_client_obd);
803
804 /* Iterate the obd_device list looking devices have grp_uuid. Start
805    searching at *next, and if a device is found, the next index to look
806    at is saved in *next. If next is NULL, then the first matching device
807    will always be returned. */
808 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
809 {
810         int i;
811
812         if (next == NULL)
813                 i = 0;
814         else if (*next >= 0 && *next < class_devno_max())
815                 i = *next;
816         else
817                 return NULL;
818
819         read_lock(&obd_dev_lock);
820         for (; i < class_devno_max(); i++) {
821                 struct obd_device *obd = class_num2obd(i);
822
823                 if (obd == NULL)
824                         continue;
825                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
826                         if (next != NULL)
827                                 *next = i+1;
828                         read_unlock(&obd_dev_lock);
829                         return obd;
830                 }
831         }
832         read_unlock(&obd_dev_lock);
833
834         return NULL;
835 }
836 EXPORT_SYMBOL(class_devices_in_group);
837
838 /**
839  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
840  * adjust sptlrpc settings accordingly.
841  */
842 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
843 {
844         struct obd_device  *obd;
845         const char         *type;
846         int                 i, rc = 0, rc2;
847
848         LASSERT(namelen > 0);
849
850         read_lock(&obd_dev_lock);
851         for (i = 0; i < class_devno_max(); i++) {
852                 obd = class_num2obd(i);
853
854                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
855                         continue;
856
857                 /* only notify mdc, osc, osp, lwp, mdt, ost
858                  * because only these have a -sptlrpc llog */
859                 type = obd->obd_type->typ_name;
860                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
861                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
862                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
863                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
864                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
865                     strcmp(type, LUSTRE_OST_NAME) != 0)
866                         continue;
867
868                 if (strncmp(obd->obd_name, fsname, namelen))
869                         continue;
870
871                 class_incref(obd, __FUNCTION__, obd);
872                 read_unlock(&obd_dev_lock);
873                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
874                                          sizeof(KEY_SPTLRPC_CONF),
875                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
876                 rc = rc ? rc : rc2;
877                 class_decref(obd, __FUNCTION__, obd);
878                 read_lock(&obd_dev_lock);
879         }
880         read_unlock(&obd_dev_lock);
881         return rc;
882 }
883 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
884
885 void obd_cleanup_caches(void)
886 {
887         ENTRY;
888         if (obd_device_cachep) {
889                 kmem_cache_destroy(obd_device_cachep);
890                 obd_device_cachep = NULL;
891         }
892
893         EXIT;
894 }
895
896 int obd_init_caches(void)
897 {
898         int rc;
899         ENTRY;
900
901         LASSERT(obd_device_cachep == NULL);
902         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
903                                 sizeof(struct obd_device),
904                                 0, 0, 0, sizeof(struct obd_device), NULL);
905         if (!obd_device_cachep)
906                 GOTO(out, rc = -ENOMEM);
907
908         RETURN(0);
909 out:
910         obd_cleanup_caches();
911         RETURN(rc);
912 }
913
914 static struct portals_handle_ops export_handle_ops;
915
916 /* map connection to client */
917 struct obd_export *class_conn2export(struct lustre_handle *conn)
918 {
919         struct obd_export *export;
920         ENTRY;
921
922         if (!conn) {
923                 CDEBUG(D_CACHE, "looking for null handle\n");
924                 RETURN(NULL);
925         }
926
927         if (conn->cookie == -1) {  /* this means assign a new connection */
928                 CDEBUG(D_CACHE, "want a new connection\n");
929                 RETURN(NULL);
930         }
931
932         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
933         export = class_handle2object(conn->cookie, &export_handle_ops);
934         RETURN(export);
935 }
936 EXPORT_SYMBOL(class_conn2export);
937
938 struct obd_device *class_exp2obd(struct obd_export *exp)
939 {
940         if (exp)
941                 return exp->exp_obd;
942         return NULL;
943 }
944 EXPORT_SYMBOL(class_exp2obd);
945
946 struct obd_import *class_exp2cliimp(struct obd_export *exp)
947 {
948         struct obd_device *obd = exp->exp_obd;
949         if (obd == NULL)
950                 return NULL;
951         return obd->u.cli.cl_import;
952 }
953 EXPORT_SYMBOL(class_exp2cliimp);
954
955 /* Export management functions */
956 static void class_export_destroy(struct obd_export *exp)
957 {
958         struct obd_device *obd = exp->exp_obd;
959         ENTRY;
960
961         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
962         LASSERT(obd != NULL);
963
964         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
965                exp->exp_client_uuid.uuid, obd->obd_name);
966
967         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
968         if (exp->exp_connection)
969                 ptlrpc_put_connection_superhack(exp->exp_connection);
970
971         LASSERT(list_empty(&exp->exp_outstanding_replies));
972         LASSERT(list_empty(&exp->exp_uncommitted_replies));
973         LASSERT(list_empty(&exp->exp_req_replay_queue));
974         LASSERT(list_empty(&exp->exp_hp_rpcs));
975         obd_destroy_export(exp);
976         /* self export doesn't hold a reference to an obd, although it
977          * exists until freeing of the obd */
978         if (exp != obd->obd_self_export)
979                 class_decref(obd, "export", exp);
980
981         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
982         EXIT;
983 }
984
985 static struct portals_handle_ops export_handle_ops = {
986         .hop_free   = NULL,
987         .hop_type       = "export",
988 };
989
990 struct obd_export *class_export_get(struct obd_export *exp)
991 {
992         refcount_inc(&exp->exp_handle.h_ref);
993         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
994                refcount_read(&exp->exp_handle.h_ref));
995         return exp;
996 }
997 EXPORT_SYMBOL(class_export_get);
998
999 void class_export_put(struct obd_export *exp)
1000 {
1001         LASSERT(exp != NULL);
1002         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
1003         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
1004         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1005                refcount_read(&exp->exp_handle.h_ref) - 1);
1006
1007         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
1008                 struct obd_device *obd = exp->exp_obd;
1009
1010                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1011                        exp, exp->exp_client_uuid.uuid);
1012
1013                 /* release nid stat refererence */
1014                 lprocfs_exp_cleanup(exp);
1015
1016                 if (exp == obd->obd_self_export) {
1017                         /* self export should be destroyed without
1018                          * zombie thread as it doesn't hold a
1019                          * reference to obd and doesn't hold any
1020                          * resources */
1021                         class_export_destroy(exp);
1022                         /* self export is destroyed, no class
1023                          * references exist and it is safe to free
1024                          * obd */
1025                         class_free_dev(obd);
1026                 } else {
1027                         LASSERT(!list_empty(&exp->exp_obd_chain));
1028                         obd_zombie_export_add(exp);
1029                 }
1030
1031         }
1032 }
1033 EXPORT_SYMBOL(class_export_put);
1034
1035 static void obd_zombie_exp_cull(struct work_struct *ws)
1036 {
1037         struct obd_export *export;
1038
1039         export = container_of(ws, struct obd_export, exp_zombie_work);
1040         class_export_destroy(export);
1041 }
1042
1043 /* Creates a new export, adds it to the hash table, and returns a
1044  * pointer to it. The refcount is 2: one for the hash reference, and
1045  * one for the pointer returned by this function. */
1046 struct obd_export *__class_new_export(struct obd_device *obd,
1047                                       struct obd_uuid *cluuid, bool is_self)
1048 {
1049         struct obd_export *export;
1050         struct cfs_hash *hash = NULL;
1051         int rc = 0;
1052         ENTRY;
1053
1054         OBD_ALLOC_PTR(export);
1055         if (!export)
1056                 return ERR_PTR(-ENOMEM);
1057
1058         export->exp_conn_cnt = 0;
1059         export->exp_lock_hash = NULL;
1060         export->exp_flock_hash = NULL;
1061         /* 2 = class_handle_hash + last */
1062         refcount_set(&export->exp_handle.h_ref, 2);
1063         atomic_set(&export->exp_rpc_count, 0);
1064         atomic_set(&export->exp_cb_count, 0);
1065         atomic_set(&export->exp_locks_count, 0);
1066 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1067         INIT_LIST_HEAD(&export->exp_locks_list);
1068         spin_lock_init(&export->exp_locks_list_guard);
1069 #endif
1070         atomic_set(&export->exp_replay_count, 0);
1071         export->exp_obd = obd;
1072         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1073         spin_lock_init(&export->exp_uncommitted_replies_lock);
1074         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1075         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1076         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1077         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1078         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1079         class_handle_hash(&export->exp_handle, &export_handle_ops);
1080         export->exp_last_request_time = ktime_get_real_seconds();
1081         spin_lock_init(&export->exp_lock);
1082         spin_lock_init(&export->exp_rpc_lock);
1083         INIT_HLIST_NODE(&export->exp_uuid_hash);
1084         INIT_HLIST_NODE(&export->exp_nid_hash);
1085         INIT_HLIST_NODE(&export->exp_gen_hash);
1086         spin_lock_init(&export->exp_bl_list_lock);
1087         INIT_LIST_HEAD(&export->exp_bl_list);
1088         INIT_LIST_HEAD(&export->exp_stale_list);
1089         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1090
1091         export->exp_sp_peer = LUSTRE_SP_ANY;
1092         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1093         export->exp_client_uuid = *cluuid;
1094         obd_init_export(export);
1095
1096         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1097                 spin_lock(&obd->obd_dev_lock);
1098                 /* shouldn't happen, but might race */
1099                 if (obd->obd_stopping)
1100                         GOTO(exit_unlock, rc = -ENODEV);
1101
1102                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1103                 if (hash == NULL)
1104                         GOTO(exit_unlock, rc = -ENODEV);
1105                 spin_unlock(&obd->obd_dev_lock);
1106
1107                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1108                 if (rc != 0) {
1109                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1110                                       obd->obd_name, cluuid->uuid, rc);
1111                         GOTO(exit_err, rc = -EALREADY);
1112                 }
1113         }
1114
1115         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1116         spin_lock(&obd->obd_dev_lock);
1117         if (obd->obd_stopping) {
1118                 if (hash)
1119                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1120                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1121         }
1122
1123         if (!is_self) {
1124                 class_incref(obd, "export", export);
1125                 list_add_tail(&export->exp_obd_chain_timed,
1126                               &obd->obd_exports_timed);
1127                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1128                 obd->obd_num_exports++;
1129         } else {
1130                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1131                 INIT_LIST_HEAD(&export->exp_obd_chain);
1132         }
1133         spin_unlock(&obd->obd_dev_lock);
1134         if (hash)
1135                 cfs_hash_putref(hash);
1136         RETURN(export);
1137
1138 exit_unlock:
1139         spin_unlock(&obd->obd_dev_lock);
1140 exit_err:
1141         if (hash)
1142                 cfs_hash_putref(hash);
1143         class_handle_unhash(&export->exp_handle);
1144         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1145         obd_destroy_export(export);
1146         OBD_FREE_PTR(export);
1147         return ERR_PTR(rc);
1148 }
1149
1150 struct obd_export *class_new_export(struct obd_device *obd,
1151                                     struct obd_uuid *uuid)
1152 {
1153         return __class_new_export(obd, uuid, false);
1154 }
1155 EXPORT_SYMBOL(class_new_export);
1156
1157 struct obd_export *class_new_export_self(struct obd_device *obd,
1158                                          struct obd_uuid *uuid)
1159 {
1160         return __class_new_export(obd, uuid, true);
1161 }
1162
1163 void class_unlink_export(struct obd_export *exp)
1164 {
1165         class_handle_unhash(&exp->exp_handle);
1166
1167         if (exp->exp_obd->obd_self_export == exp) {
1168                 class_export_put(exp);
1169                 return;
1170         }
1171
1172         spin_lock(&exp->exp_obd->obd_dev_lock);
1173         /* delete an uuid-export hashitem from hashtables */
1174         if (!hlist_unhashed(&exp->exp_uuid_hash))
1175                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1176                              &exp->exp_client_uuid,
1177                              &exp->exp_uuid_hash);
1178
1179 #ifdef HAVE_SERVER_SUPPORT
1180         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1181                 struct tg_export_data   *ted = &exp->exp_target_data;
1182                 struct cfs_hash         *hash;
1183
1184                 /* Because obd_gen_hash will not be released until
1185                  * class_cleanup(), so hash should never be NULL here */
1186                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1187                 LASSERT(hash != NULL);
1188                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1189                              &exp->exp_gen_hash);
1190                 cfs_hash_putref(hash);
1191         }
1192 #endif /* HAVE_SERVER_SUPPORT */
1193
1194         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1195         list_del_init(&exp->exp_obd_chain_timed);
1196         exp->exp_obd->obd_num_exports--;
1197         spin_unlock(&exp->exp_obd->obd_dev_lock);
1198         atomic_inc(&obd_stale_export_num);
1199
1200         /* A reference is kept by obd_stale_exports list */
1201         obd_stale_export_put(exp);
1202 }
1203 EXPORT_SYMBOL(class_unlink_export);
1204
1205 /* Import management functions */
1206 static void obd_zombie_import_free(struct obd_import *imp)
1207 {
1208         ENTRY;
1209
1210         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1211                 imp->imp_obd->obd_name);
1212
1213         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1214
1215         ptlrpc_put_connection_superhack(imp->imp_connection);
1216
1217         while (!list_empty(&imp->imp_conn_list)) {
1218                 struct obd_import_conn *imp_conn;
1219
1220                 imp_conn = list_entry(imp->imp_conn_list.next,
1221                                       struct obd_import_conn, oic_item);
1222                 list_del_init(&imp_conn->oic_item);
1223                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1224                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1225         }
1226
1227         LASSERT(imp->imp_sec == NULL);
1228         class_decref(imp->imp_obd, "import", imp);
1229         OBD_FREE_PTR(imp);
1230         EXIT;
1231 }
1232
1233 struct obd_import *class_import_get(struct obd_import *import)
1234 {
1235         atomic_inc(&import->imp_refcount);
1236         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1237                atomic_read(&import->imp_refcount),
1238                import->imp_obd->obd_name);
1239         return import;
1240 }
1241 EXPORT_SYMBOL(class_import_get);
1242
1243 void class_import_put(struct obd_import *imp)
1244 {
1245         ENTRY;
1246
1247         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1248
1249         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1250                atomic_read(&imp->imp_refcount) - 1,
1251                imp->imp_obd->obd_name);
1252
1253         if (atomic_dec_and_test(&imp->imp_refcount)) {
1254                 CDEBUG(D_INFO, "final put import %p\n", imp);
1255                 obd_zombie_import_add(imp);
1256         }
1257
1258         EXIT;
1259 }
1260 EXPORT_SYMBOL(class_import_put);
1261
1262 static void init_imp_at(struct imp_at *at) {
1263         int i;
1264         at_init(&at->iat_net_latency, 0, 0);
1265         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1266                 /* max service estimates are tracked on the server side, so
1267                    don't use the AT history here, just use the last reported
1268                    val. (But keep hist for proc histogram, worst_ever) */
1269                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1270                         AT_FLG_NOHIST);
1271         }
1272 }
1273
1274 static void obd_zombie_imp_cull(struct work_struct *ws)
1275 {
1276         struct obd_import *import;
1277
1278         import = container_of(ws, struct obd_import, imp_zombie_work);
1279         obd_zombie_import_free(import);
1280 }
1281
1282 struct obd_import *class_new_import(struct obd_device *obd)
1283 {
1284         struct obd_import *imp;
1285         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1286
1287         OBD_ALLOC(imp, sizeof(*imp));
1288         if (imp == NULL)
1289                 return NULL;
1290
1291         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1292         INIT_LIST_HEAD(&imp->imp_replay_list);
1293         INIT_LIST_HEAD(&imp->imp_sending_list);
1294         INIT_LIST_HEAD(&imp->imp_delayed_list);
1295         INIT_LIST_HEAD(&imp->imp_committed_list);
1296         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1297         imp->imp_known_replied_xid = 0;
1298         imp->imp_replay_cursor = &imp->imp_committed_list;
1299         spin_lock_init(&imp->imp_lock);
1300         imp->imp_last_success_conn = 0;
1301         imp->imp_state = LUSTRE_IMP_NEW;
1302         imp->imp_obd = class_incref(obd, "import", imp);
1303         rwlock_init(&imp->imp_sec_lock);
1304         init_waitqueue_head(&imp->imp_recovery_waitq);
1305         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1306
1307         if (curr_pid_ns->child_reaper)
1308                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1309         else
1310                 imp->imp_sec_refpid = 1;
1311
1312         atomic_set(&imp->imp_refcount, 2);
1313         atomic_set(&imp->imp_unregistering, 0);
1314         atomic_set(&imp->imp_inflight, 0);
1315         atomic_set(&imp->imp_replay_inflight, 0);
1316         atomic_set(&imp->imp_inval_count, 0);
1317         INIT_LIST_HEAD(&imp->imp_conn_list);
1318         init_imp_at(&imp->imp_at);
1319
1320         /* the default magic is V2, will be used in connect RPC, and
1321          * then adjusted according to the flags in request/reply. */
1322         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1323
1324         return imp;
1325 }
1326 EXPORT_SYMBOL(class_new_import);
1327
1328 void class_destroy_import(struct obd_import *import)
1329 {
1330         LASSERT(import != NULL);
1331         LASSERT(import != LP_POISON);
1332
1333         spin_lock(&import->imp_lock);
1334         import->imp_generation++;
1335         spin_unlock(&import->imp_lock);
1336         class_import_put(import);
1337 }
1338 EXPORT_SYMBOL(class_destroy_import);
1339
1340 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1341
1342 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1343 {
1344         spin_lock(&exp->exp_locks_list_guard);
1345
1346         LASSERT(lock->l_exp_refs_nr >= 0);
1347
1348         if (lock->l_exp_refs_target != NULL &&
1349             lock->l_exp_refs_target != exp) {
1350                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1351                               exp, lock, lock->l_exp_refs_target);
1352         }
1353         if ((lock->l_exp_refs_nr ++) == 0) {
1354                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1355                 lock->l_exp_refs_target = exp;
1356         }
1357         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1358                lock, exp, lock->l_exp_refs_nr);
1359         spin_unlock(&exp->exp_locks_list_guard);
1360 }
1361 EXPORT_SYMBOL(__class_export_add_lock_ref);
1362
1363 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1364 {
1365         spin_lock(&exp->exp_locks_list_guard);
1366         LASSERT(lock->l_exp_refs_nr > 0);
1367         if (lock->l_exp_refs_target != exp) {
1368                 LCONSOLE_WARN("lock %p, "
1369                               "mismatching export pointers: %p, %p\n",
1370                               lock, lock->l_exp_refs_target, exp);
1371         }
1372         if (-- lock->l_exp_refs_nr == 0) {
1373                 list_del_init(&lock->l_exp_refs_link);
1374                 lock->l_exp_refs_target = NULL;
1375         }
1376         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1377                lock, exp, lock->l_exp_refs_nr);
1378         spin_unlock(&exp->exp_locks_list_guard);
1379 }
1380 EXPORT_SYMBOL(__class_export_del_lock_ref);
1381 #endif
1382
1383 /* A connection defines an export context in which preallocation can
1384    be managed. This releases the export pointer reference, and returns
1385    the export handle, so the export refcount is 1 when this function
1386    returns. */
1387 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1388                   struct obd_uuid *cluuid)
1389 {
1390         struct obd_export *export;
1391         LASSERT(conn != NULL);
1392         LASSERT(obd != NULL);
1393         LASSERT(cluuid != NULL);
1394         ENTRY;
1395
1396         export = class_new_export(obd, cluuid);
1397         if (IS_ERR(export))
1398                 RETURN(PTR_ERR(export));
1399
1400         conn->cookie = export->exp_handle.h_cookie;
1401         class_export_put(export);
1402
1403         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1404                cluuid->uuid, conn->cookie);
1405         RETURN(0);
1406 }
1407 EXPORT_SYMBOL(class_connect);
1408
1409 /* if export is involved in recovery then clean up related things */
1410 static void class_export_recovery_cleanup(struct obd_export *exp)
1411 {
1412         struct obd_device *obd = exp->exp_obd;
1413
1414         spin_lock(&obd->obd_recovery_task_lock);
1415         if (obd->obd_recovering) {
1416                 if (exp->exp_in_recovery) {
1417                         spin_lock(&exp->exp_lock);
1418                         exp->exp_in_recovery = 0;
1419                         spin_unlock(&exp->exp_lock);
1420                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1421                         atomic_dec(&obd->obd_connected_clients);
1422                 }
1423
1424                 /* if called during recovery then should update
1425                  * obd_stale_clients counter,
1426                  * lightweight exports are not counted */
1427                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1428                         exp->exp_obd->obd_stale_clients++;
1429         }
1430         spin_unlock(&obd->obd_recovery_task_lock);
1431
1432         spin_lock(&exp->exp_lock);
1433         /** Cleanup req replay fields */
1434         if (exp->exp_req_replay_needed) {
1435                 exp->exp_req_replay_needed = 0;
1436
1437                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1438                 atomic_dec(&obd->obd_req_replay_clients);
1439         }
1440
1441         /** Cleanup lock replay data */
1442         if (exp->exp_lock_replay_needed) {
1443                 exp->exp_lock_replay_needed = 0;
1444
1445                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1446                 atomic_dec(&obd->obd_lock_replay_clients);
1447         }
1448         spin_unlock(&exp->exp_lock);
1449 }
1450
1451 /* This function removes 1-3 references from the export:
1452  * 1 - for export pointer passed
1453  * and if disconnect really need
1454  * 2 - removing from hash
1455  * 3 - in client_unlink_export
1456  * The export pointer passed to this function can destroyed */
1457 int class_disconnect(struct obd_export *export)
1458 {
1459         int already_disconnected;
1460         ENTRY;
1461
1462         if (export == NULL) {
1463                 CWARN("attempting to free NULL export %p\n", export);
1464                 RETURN(-EINVAL);
1465         }
1466
1467         spin_lock(&export->exp_lock);
1468         already_disconnected = export->exp_disconnected;
1469         export->exp_disconnected = 1;
1470         /*  We hold references of export for uuid hash
1471          *  and nid_hash and export link at least. So
1472          *  it is safe to call cfs_hash_del in there.  */
1473         if (!hlist_unhashed(&export->exp_nid_hash))
1474                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1475                              &export->exp_connection->c_peer.nid,
1476                              &export->exp_nid_hash);
1477         spin_unlock(&export->exp_lock);
1478
1479         /* class_cleanup(), abort_recovery(), and class_fail_export()
1480          * all end up in here, and if any of them race we shouldn't
1481          * call extra class_export_puts(). */
1482         if (already_disconnected) {
1483                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1484                 GOTO(no_disconn, already_disconnected);
1485         }
1486
1487         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1488                export->exp_handle.h_cookie);
1489
1490         class_export_recovery_cleanup(export);
1491         class_unlink_export(export);
1492 no_disconn:
1493         class_export_put(export);
1494         RETURN(0);
1495 }
1496 EXPORT_SYMBOL(class_disconnect);
1497
1498 /* Return non-zero for a fully connected export */
1499 int class_connected_export(struct obd_export *exp)
1500 {
1501         int connected = 0;
1502
1503         if (exp) {
1504                 spin_lock(&exp->exp_lock);
1505                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1506                 spin_unlock(&exp->exp_lock);
1507         }
1508         return connected;
1509 }
1510 EXPORT_SYMBOL(class_connected_export);
1511
1512 static void class_disconnect_export_list(struct list_head *list,
1513                                          enum obd_option flags)
1514 {
1515         int rc;
1516         struct obd_export *exp;
1517         ENTRY;
1518
1519         /* It's possible that an export may disconnect itself, but
1520          * nothing else will be added to this list. */
1521         while (!list_empty(list)) {
1522                 exp = list_entry(list->next, struct obd_export,
1523                                  exp_obd_chain);
1524                 /* need for safe call CDEBUG after obd_disconnect */
1525                 class_export_get(exp);
1526
1527                 spin_lock(&exp->exp_lock);
1528                 exp->exp_flags = flags;
1529                 spin_unlock(&exp->exp_lock);
1530
1531                 if (obd_uuid_equals(&exp->exp_client_uuid,
1532                                     &exp->exp_obd->obd_uuid)) {
1533                         CDEBUG(D_HA,
1534                                "exp %p export uuid == obd uuid, don't discon\n",
1535                                exp);
1536                         /* Need to delete this now so we don't end up pointing
1537                          * to work_list later when this export is cleaned up. */
1538                         list_del_init(&exp->exp_obd_chain);
1539                         class_export_put(exp);
1540                         continue;
1541                 }
1542
1543                 class_export_get(exp);
1544                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1545                        "last request at %lld\n",
1546                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1547                        exp, exp->exp_last_request_time);
1548                 /* release one export reference anyway */
1549                 rc = obd_disconnect(exp);
1550
1551                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1552                        obd_export_nid2str(exp), exp, rc);
1553                 class_export_put(exp);
1554         }
1555         EXIT;
1556 }
1557
1558 void class_disconnect_exports(struct obd_device *obd)
1559 {
1560         struct list_head work_list;
1561         ENTRY;
1562
1563         /* Move all of the exports from obd_exports to a work list, en masse. */
1564         INIT_LIST_HEAD(&work_list);
1565         spin_lock(&obd->obd_dev_lock);
1566         list_splice_init(&obd->obd_exports, &work_list);
1567         list_splice_init(&obd->obd_delayed_exports, &work_list);
1568         spin_unlock(&obd->obd_dev_lock);
1569
1570         if (!list_empty(&work_list)) {
1571                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1572                        "disconnecting them\n", obd->obd_minor, obd);
1573                 class_disconnect_export_list(&work_list,
1574                                              exp_flags_from_obd(obd));
1575         } else
1576                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1577                        obd->obd_minor, obd);
1578         EXIT;
1579 }
1580 EXPORT_SYMBOL(class_disconnect_exports);
1581
1582 /* Remove exports that have not completed recovery.
1583  */
1584 void class_disconnect_stale_exports(struct obd_device *obd,
1585                                     int (*test_export)(struct obd_export *))
1586 {
1587         struct list_head work_list;
1588         struct obd_export *exp, *n;
1589         int evicted = 0;
1590         ENTRY;
1591
1592         INIT_LIST_HEAD(&work_list);
1593         spin_lock(&obd->obd_dev_lock);
1594         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1595                                  exp_obd_chain) {
1596                 /* don't count self-export as client */
1597                 if (obd_uuid_equals(&exp->exp_client_uuid,
1598                                     &exp->exp_obd->obd_uuid))
1599                         continue;
1600
1601                 /* don't evict clients which have no slot in last_rcvd
1602                  * (e.g. lightweight connection) */
1603                 if (exp->exp_target_data.ted_lr_idx == -1)
1604                         continue;
1605
1606                 spin_lock(&exp->exp_lock);
1607                 if (exp->exp_failed || test_export(exp)) {
1608                         spin_unlock(&exp->exp_lock);
1609                         continue;
1610                 }
1611                 exp->exp_failed = 1;
1612                 spin_unlock(&exp->exp_lock);
1613
1614                 list_move(&exp->exp_obd_chain, &work_list);
1615                 evicted++;
1616                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1617                        obd->obd_name, exp->exp_client_uuid.uuid,
1618                        obd_export_nid2str(exp));
1619                 print_export_data(exp, "EVICTING", 0, D_HA);
1620         }
1621         spin_unlock(&obd->obd_dev_lock);
1622
1623         if (evicted)
1624                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1625                               obd->obd_name, evicted);
1626
1627         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1628                                                  OBD_OPT_ABORT_RECOV);
1629         EXIT;
1630 }
1631 EXPORT_SYMBOL(class_disconnect_stale_exports);
1632
1633 void class_fail_export(struct obd_export *exp)
1634 {
1635         int rc, already_failed;
1636
1637         spin_lock(&exp->exp_lock);
1638         already_failed = exp->exp_failed;
1639         exp->exp_failed = 1;
1640         spin_unlock(&exp->exp_lock);
1641
1642         if (already_failed) {
1643                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1644                        exp, exp->exp_client_uuid.uuid);
1645                 return;
1646         }
1647
1648         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1649                exp, exp->exp_client_uuid.uuid);
1650
1651         if (obd_dump_on_timeout)
1652                 libcfs_debug_dumplog();
1653
1654         /* need for safe call CDEBUG after obd_disconnect */
1655         class_export_get(exp);
1656
1657         /* Most callers into obd_disconnect are removing their own reference
1658          * (request, for example) in addition to the one from the hash table.
1659          * We don't have such a reference here, so make one. */
1660         class_export_get(exp);
1661         rc = obd_disconnect(exp);
1662         if (rc)
1663                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1664         else
1665                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1666                        exp, exp->exp_client_uuid.uuid);
1667         class_export_put(exp);
1668 }
1669 EXPORT_SYMBOL(class_fail_export);
1670
1671 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1672 {
1673         struct cfs_hash *nid_hash;
1674         struct obd_export *doomed_exp = NULL;
1675         int exports_evicted = 0;
1676
1677         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1678
1679         spin_lock(&obd->obd_dev_lock);
1680         /* umount has run already, so evict thread should leave
1681          * its task to umount thread now */
1682         if (obd->obd_stopping) {
1683                 spin_unlock(&obd->obd_dev_lock);
1684                 return exports_evicted;
1685         }
1686         nid_hash = obd->obd_nid_hash;
1687         cfs_hash_getref(nid_hash);
1688         spin_unlock(&obd->obd_dev_lock);
1689
1690         do {
1691                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1692                 if (doomed_exp == NULL)
1693                         break;
1694
1695                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1696                          "nid %s found, wanted nid %s, requested nid %s\n",
1697                          obd_export_nid2str(doomed_exp),
1698                          libcfs_nid2str(nid_key), nid);
1699                 LASSERTF(doomed_exp != obd->obd_self_export,
1700                          "self-export is hashed by NID?\n");
1701                 exports_evicted++;
1702                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1703                               "request\n", obd->obd_name,
1704                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1705                               obd_export_nid2str(doomed_exp));
1706                 class_fail_export(doomed_exp);
1707                 class_export_put(doomed_exp);
1708         } while (1);
1709
1710         cfs_hash_putref(nid_hash);
1711
1712         if (!exports_evicted)
1713                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1714                        obd->obd_name, nid);
1715         return exports_evicted;
1716 }
1717 EXPORT_SYMBOL(obd_export_evict_by_nid);
1718
1719 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1720 {
1721         struct cfs_hash *uuid_hash;
1722         struct obd_export *doomed_exp = NULL;
1723         struct obd_uuid doomed_uuid;
1724         int exports_evicted = 0;
1725
1726         spin_lock(&obd->obd_dev_lock);
1727         if (obd->obd_stopping) {
1728                 spin_unlock(&obd->obd_dev_lock);
1729                 return exports_evicted;
1730         }
1731         uuid_hash = obd->obd_uuid_hash;
1732         cfs_hash_getref(uuid_hash);
1733         spin_unlock(&obd->obd_dev_lock);
1734
1735         obd_str2uuid(&doomed_uuid, uuid);
1736         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1737                 CERROR("%s: can't evict myself\n", obd->obd_name);
1738                 cfs_hash_putref(uuid_hash);
1739                 return exports_evicted;
1740         }
1741
1742         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1743
1744         if (doomed_exp == NULL) {
1745                 CERROR("%s: can't disconnect %s: no exports found\n",
1746                        obd->obd_name, uuid);
1747         } else {
1748                 CWARN("%s: evicting %s at adminstrative request\n",
1749                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1750                 class_fail_export(doomed_exp);
1751                 class_export_put(doomed_exp);
1752                 exports_evicted++;
1753         }
1754         cfs_hash_putref(uuid_hash);
1755
1756         return exports_evicted;
1757 }
1758
1759 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1760 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1761 EXPORT_SYMBOL(class_export_dump_hook);
1762 #endif
1763
1764 static void print_export_data(struct obd_export *exp, const char *status,
1765                               int locks, int debug_level)
1766 {
1767         struct ptlrpc_reply_state *rs;
1768         struct ptlrpc_reply_state *first_reply = NULL;
1769         int nreplies = 0;
1770
1771         spin_lock(&exp->exp_lock);
1772         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1773                             rs_exp_list) {
1774                 if (nreplies == 0)
1775                         first_reply = rs;
1776                 nreplies++;
1777         }
1778         spin_unlock(&exp->exp_lock);
1779
1780         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1781                "%p %s %llu stale:%d\n",
1782                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1783                obd_export_nid2str(exp),
1784                refcount_read(&exp->exp_handle.h_ref),
1785                atomic_read(&exp->exp_rpc_count),
1786                atomic_read(&exp->exp_cb_count),
1787                atomic_read(&exp->exp_locks_count),
1788                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1789                nreplies, first_reply, nreplies > 3 ? "..." : "",
1790                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1791 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1792         if (locks && class_export_dump_hook != NULL)
1793                 class_export_dump_hook(exp);
1794 #endif
1795 }
1796
1797 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1798 {
1799         struct obd_export *exp;
1800
1801         spin_lock(&obd->obd_dev_lock);
1802         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1803                 print_export_data(exp, "ACTIVE", locks, debug_level);
1804         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1805                 print_export_data(exp, "UNLINKED", locks, debug_level);
1806         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1807                 print_export_data(exp, "DELAYED", locks, debug_level);
1808         spin_unlock(&obd->obd_dev_lock);
1809 }
1810
1811 void obd_exports_barrier(struct obd_device *obd)
1812 {
1813         int waited = 2;
1814         LASSERT(list_empty(&obd->obd_exports));
1815         spin_lock(&obd->obd_dev_lock);
1816         while (!list_empty(&obd->obd_unlinked_exports)) {
1817                 spin_unlock(&obd->obd_dev_lock);
1818                 set_current_state(TASK_UNINTERRUPTIBLE);
1819                 schedule_timeout(cfs_time_seconds(waited));
1820                 if (waited > 5 && is_power_of_2(waited)) {
1821                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1822                                       "more than %d seconds. "
1823                                       "The obd refcount = %d. Is it stuck?\n",
1824                                       obd->obd_name, waited,
1825                                       atomic_read(&obd->obd_refcount));
1826                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1827                 }
1828                 waited *= 2;
1829                 spin_lock(&obd->obd_dev_lock);
1830         }
1831         spin_unlock(&obd->obd_dev_lock);
1832 }
1833 EXPORT_SYMBOL(obd_exports_barrier);
1834
1835 /**
1836  * Add export to the obd_zombe thread and notify it.
1837  */
1838 static void obd_zombie_export_add(struct obd_export *exp) {
1839         atomic_dec(&obd_stale_export_num);
1840         spin_lock(&exp->exp_obd->obd_dev_lock);
1841         LASSERT(!list_empty(&exp->exp_obd_chain));
1842         list_del_init(&exp->exp_obd_chain);
1843         spin_unlock(&exp->exp_obd->obd_dev_lock);
1844
1845         queue_work(zombie_wq, &exp->exp_zombie_work);
1846 }
1847
1848 /**
1849  * Add import to the obd_zombe thread and notify it.
1850  */
1851 static void obd_zombie_import_add(struct obd_import *imp) {
1852         LASSERT(imp->imp_sec == NULL);
1853
1854         queue_work(zombie_wq, &imp->imp_zombie_work);
1855 }
1856
1857 /**
1858  * wait when obd_zombie import/export queues become empty
1859  */
1860 void obd_zombie_barrier(void)
1861 {
1862         flush_workqueue(zombie_wq);
1863 }
1864 EXPORT_SYMBOL(obd_zombie_barrier);
1865
1866
1867 struct obd_export *obd_stale_export_get(void)
1868 {
1869         struct obd_export *exp = NULL;
1870         ENTRY;
1871
1872         spin_lock(&obd_stale_export_lock);
1873         if (!list_empty(&obd_stale_exports)) {
1874                 exp = list_entry(obd_stale_exports.next,
1875                                  struct obd_export, exp_stale_list);
1876                 list_del_init(&exp->exp_stale_list);
1877         }
1878         spin_unlock(&obd_stale_export_lock);
1879
1880         if (exp) {
1881                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1882                        atomic_read(&obd_stale_export_num));
1883         }
1884         RETURN(exp);
1885 }
1886 EXPORT_SYMBOL(obd_stale_export_get);
1887
1888 void obd_stale_export_put(struct obd_export *exp)
1889 {
1890         ENTRY;
1891
1892         LASSERT(list_empty(&exp->exp_stale_list));
1893         if (exp->exp_lock_hash &&
1894             atomic_read(&exp->exp_lock_hash->hs_count)) {
1895                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1896                        atomic_read(&obd_stale_export_num));
1897
1898                 spin_lock_bh(&exp->exp_bl_list_lock);
1899                 spin_lock(&obd_stale_export_lock);
1900                 /* Add to the tail if there is no blocked locks,
1901                  * to the head otherwise. */
1902                 if (list_empty(&exp->exp_bl_list))
1903                         list_add_tail(&exp->exp_stale_list,
1904                                       &obd_stale_exports);
1905                 else
1906                         list_add(&exp->exp_stale_list,
1907                                  &obd_stale_exports);
1908
1909                 spin_unlock(&obd_stale_export_lock);
1910                 spin_unlock_bh(&exp->exp_bl_list_lock);
1911         } else {
1912                 class_export_put(exp);
1913         }
1914         EXIT;
1915 }
1916 EXPORT_SYMBOL(obd_stale_export_put);
1917
1918 /**
1919  * Adjust the position of the export in the stale list,
1920  * i.e. move to the head of the list if is needed.
1921  **/
1922 void obd_stale_export_adjust(struct obd_export *exp)
1923 {
1924         LASSERT(exp != NULL);
1925         spin_lock_bh(&exp->exp_bl_list_lock);
1926         spin_lock(&obd_stale_export_lock);
1927
1928         if (!list_empty(&exp->exp_stale_list) &&
1929             !list_empty(&exp->exp_bl_list))
1930                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1931
1932         spin_unlock(&obd_stale_export_lock);
1933         spin_unlock_bh(&exp->exp_bl_list_lock);
1934 }
1935 EXPORT_SYMBOL(obd_stale_export_adjust);
1936
1937 /**
1938  * start destroy zombie import/export thread
1939  */
1940 int obd_zombie_impexp_init(void)
1941 {
1942         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1943         if (!zombie_wq)
1944                 return -ENOMEM;
1945
1946         return 0;
1947 }
1948
1949 /**
1950  * stop destroy zombie import/export thread
1951  */
1952 void obd_zombie_impexp_stop(void)
1953 {
1954         destroy_workqueue(zombie_wq);
1955         LASSERT(list_empty(&obd_stale_exports));
1956 }
1957
1958 /***** Kernel-userspace comm helpers *******/
1959
1960 /* Get length of entire message, including header */
1961 int kuc_len(int payload_len)
1962 {
1963         return sizeof(struct kuc_hdr) + payload_len;
1964 }
1965 EXPORT_SYMBOL(kuc_len);
1966
1967 /* Get a pointer to kuc header, given a ptr to the payload
1968  * @param p Pointer to payload area
1969  * @returns Pointer to kuc header
1970  */
1971 struct kuc_hdr * kuc_ptr(void *p)
1972 {
1973         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1974         LASSERT(lh->kuc_magic == KUC_MAGIC);
1975         return lh;
1976 }
1977 EXPORT_SYMBOL(kuc_ptr);
1978
1979 /* Alloc space for a message, and fill in header
1980  * @return Pointer to payload area
1981  */
1982 void *kuc_alloc(int payload_len, int transport, int type)
1983 {
1984         struct kuc_hdr *lh;
1985         int len = kuc_len(payload_len);
1986
1987         OBD_ALLOC(lh, len);
1988         if (lh == NULL)
1989                 return ERR_PTR(-ENOMEM);
1990
1991         lh->kuc_magic = KUC_MAGIC;
1992         lh->kuc_transport = transport;
1993         lh->kuc_msgtype = type;
1994         lh->kuc_msglen = len;
1995
1996         return (void *)(lh + 1);
1997 }
1998 EXPORT_SYMBOL(kuc_alloc);
1999
2000 /* Takes pointer to payload area */
2001 void kuc_free(void *p, int payload_len)
2002 {
2003         struct kuc_hdr *lh = kuc_ptr(p);
2004         OBD_FREE(lh, kuc_len(payload_len));
2005 }
2006 EXPORT_SYMBOL(kuc_free);
2007
2008 struct obd_request_slot_waiter {
2009         struct list_head        orsw_entry;
2010         wait_queue_head_t       orsw_waitq;
2011         bool                    orsw_signaled;
2012 };
2013
2014 static bool obd_request_slot_avail(struct client_obd *cli,
2015                                    struct obd_request_slot_waiter *orsw)
2016 {
2017         bool avail;
2018
2019         spin_lock(&cli->cl_loi_list_lock);
2020         avail = !!list_empty(&orsw->orsw_entry);
2021         spin_unlock(&cli->cl_loi_list_lock);
2022
2023         return avail;
2024 };
2025
2026 /*
2027  * For network flow control, the RPC sponsor needs to acquire a credit
2028  * before sending the RPC. The credits count for a connection is defined
2029  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2030  * the subsequent RPC sponsors need to wait until others released their
2031  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2032  */
2033 int obd_get_request_slot(struct client_obd *cli)
2034 {
2035         struct obd_request_slot_waiter   orsw;
2036         struct l_wait_info               lwi;
2037         int                              rc;
2038
2039         spin_lock(&cli->cl_loi_list_lock);
2040         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2041                 cli->cl_rpcs_in_flight++;
2042                 spin_unlock(&cli->cl_loi_list_lock);
2043                 return 0;
2044         }
2045
2046         init_waitqueue_head(&orsw.orsw_waitq);
2047         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2048         orsw.orsw_signaled = false;
2049         spin_unlock(&cli->cl_loi_list_lock);
2050
2051         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2052         rc = l_wait_event(orsw.orsw_waitq,
2053                           obd_request_slot_avail(cli, &orsw) ||
2054                           orsw.orsw_signaled,
2055                           &lwi);
2056
2057         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2058          * freed but other (such as obd_put_request_slot) is using it. */
2059         spin_lock(&cli->cl_loi_list_lock);
2060         if (rc != 0) {
2061                 if (!orsw.orsw_signaled) {
2062                         if (list_empty(&orsw.orsw_entry))
2063                                 cli->cl_rpcs_in_flight--;
2064                         else
2065                                 list_del(&orsw.orsw_entry);
2066                 }
2067         }
2068
2069         if (orsw.orsw_signaled) {
2070                 LASSERT(list_empty(&orsw.orsw_entry));
2071
2072                 rc = -EINTR;
2073         }
2074         spin_unlock(&cli->cl_loi_list_lock);
2075
2076         return rc;
2077 }
2078 EXPORT_SYMBOL(obd_get_request_slot);
2079
2080 void obd_put_request_slot(struct client_obd *cli)
2081 {
2082         struct obd_request_slot_waiter *orsw;
2083
2084         spin_lock(&cli->cl_loi_list_lock);
2085         cli->cl_rpcs_in_flight--;
2086
2087         /* If there is free slot, wakeup the first waiter. */
2088         if (!list_empty(&cli->cl_flight_waiters) &&
2089             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2090                 orsw = list_entry(cli->cl_flight_waiters.next,
2091                                   struct obd_request_slot_waiter, orsw_entry);
2092                 list_del_init(&orsw->orsw_entry);
2093                 cli->cl_rpcs_in_flight++;
2094                 wake_up(&orsw->orsw_waitq);
2095         }
2096         spin_unlock(&cli->cl_loi_list_lock);
2097 }
2098 EXPORT_SYMBOL(obd_put_request_slot);
2099
2100 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2101 {
2102         return cli->cl_max_rpcs_in_flight;
2103 }
2104 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2105
2106 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2107 {
2108         struct obd_request_slot_waiter *orsw;
2109         __u32                           old;
2110         int                             diff;
2111         int                             i;
2112         const char *type_name;
2113         int                             rc;
2114
2115         if (max > OBD_MAX_RIF_MAX || max < 1)
2116                 return -ERANGE;
2117
2118         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2119         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2120                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2121                  * strictly lower that max_rpcs_in_flight */
2122                 if (max < 2) {
2123                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2124                                "because it must be higher than "
2125                                "max_mod_rpcs_in_flight value",
2126                                cli->cl_import->imp_obd->obd_name);
2127                         return -ERANGE;
2128                 }
2129                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2130                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2131                         if (rc != 0)
2132                                 return rc;
2133                 }
2134         }
2135
2136         spin_lock(&cli->cl_loi_list_lock);
2137         old = cli->cl_max_rpcs_in_flight;
2138         cli->cl_max_rpcs_in_flight = max;
2139         client_adjust_max_dirty(cli);
2140
2141         diff = max - old;
2142
2143         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2144         for (i = 0; i < diff; i++) {
2145                 if (list_empty(&cli->cl_flight_waiters))
2146                         break;
2147
2148                 orsw = list_entry(cli->cl_flight_waiters.next,
2149                                   struct obd_request_slot_waiter, orsw_entry);
2150                 list_del_init(&orsw->orsw_entry);
2151                 cli->cl_rpcs_in_flight++;
2152                 wake_up(&orsw->orsw_waitq);
2153         }
2154         spin_unlock(&cli->cl_loi_list_lock);
2155
2156         return 0;
2157 }
2158 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2159
2160 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2161 {
2162         return cli->cl_max_mod_rpcs_in_flight;
2163 }
2164 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2165
2166 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2167 {
2168         struct obd_connect_data *ocd;
2169         __u16 maxmodrpcs;
2170         __u16 prev;
2171
2172         if (max > OBD_MAX_RIF_MAX || max < 1)
2173                 return -ERANGE;
2174
2175         /* cannot exceed or equal max_rpcs_in_flight */
2176         if (max >= cli->cl_max_rpcs_in_flight) {
2177                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2178                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2179                        cli->cl_import->imp_obd->obd_name,
2180                        max, cli->cl_max_rpcs_in_flight);
2181                 return -ERANGE;
2182         }
2183
2184         /* cannot exceed max modify RPCs in flight supported by the server */
2185         ocd = &cli->cl_import->imp_connect_data;
2186         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2187                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2188         else
2189                 maxmodrpcs = 1;
2190         if (max > maxmodrpcs) {
2191                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2192                        "higher than max_mod_rpcs_per_client value (%hu) "
2193                        "returned by the server at connection\n",
2194                        cli->cl_import->imp_obd->obd_name,
2195                        max, maxmodrpcs);
2196                 return -ERANGE;
2197         }
2198
2199         spin_lock(&cli->cl_mod_rpcs_lock);
2200
2201         prev = cli->cl_max_mod_rpcs_in_flight;
2202         cli->cl_max_mod_rpcs_in_flight = max;
2203
2204         /* wakeup waiters if limit has been increased */
2205         if (cli->cl_max_mod_rpcs_in_flight > prev)
2206                 wake_up(&cli->cl_mod_rpcs_waitq);
2207
2208         spin_unlock(&cli->cl_mod_rpcs_lock);
2209
2210         return 0;
2211 }
2212 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2213
2214 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2215                                struct seq_file *seq)
2216 {
2217         unsigned long mod_tot = 0, mod_cum;
2218         struct timespec64 now;
2219         int i;
2220
2221         ktime_get_real_ts64(&now);
2222
2223         spin_lock(&cli->cl_mod_rpcs_lock);
2224
2225         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2226                    (s64)now.tv_sec, now.tv_nsec);
2227         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2228                    cli->cl_mod_rpcs_in_flight);
2229
2230         seq_printf(seq, "\n\t\t\tmodify\n");
2231         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2232
2233         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2234
2235         mod_cum = 0;
2236         for (i = 0; i < OBD_HIST_MAX; i++) {
2237                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2238                 mod_cum += mod;
2239                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2240                            i, mod, pct(mod, mod_tot),
2241                            pct(mod_cum, mod_tot));
2242                 if (mod_cum == mod_tot)
2243                         break;
2244         }
2245
2246         spin_unlock(&cli->cl_mod_rpcs_lock);
2247
2248         return 0;
2249 }
2250 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2251
2252 /* The number of modify RPCs sent in parallel is limited
2253  * because the server has a finite number of slots per client to
2254  * store request result and ensure reply reconstruction when needed.
2255  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2256  * that takes into account server limit and cl_max_rpcs_in_flight
2257  * value.
2258  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2259  * one close request is allowed above the maximum.
2260  */
2261 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2262                                                  bool close_req)
2263 {
2264         bool avail;
2265
2266         /* A slot is available if
2267          * - number of modify RPCs in flight is less than the max
2268          * - it's a close RPC and no other close request is in flight
2269          */
2270         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2271                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2272
2273         return avail;
2274 }
2275
2276 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2277                                          bool close_req)
2278 {
2279         bool avail;
2280
2281         spin_lock(&cli->cl_mod_rpcs_lock);
2282         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2283         spin_unlock(&cli->cl_mod_rpcs_lock);
2284         return avail;
2285 }
2286
2287
2288 /* Get a modify RPC slot from the obd client @cli according
2289  * to the kind of operation @opc that is going to be sent
2290  * and the intent @it of the operation if it applies.
2291  * If the maximum number of modify RPCs in flight is reached
2292  * the thread is put to sleep.
2293  * Returns the tag to be set in the request message. Tag 0
2294  * is reserved for non-modifying requests.
2295  */
2296 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2297 {
2298         bool                    close_req = false;
2299         __u16                   i, max;
2300
2301         if (opc == MDS_CLOSE)
2302                 close_req = true;
2303
2304         do {
2305                 spin_lock(&cli->cl_mod_rpcs_lock);
2306                 max = cli->cl_max_mod_rpcs_in_flight;
2307                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2308                         /* there is a slot available */
2309                         cli->cl_mod_rpcs_in_flight++;
2310                         if (close_req)
2311                                 cli->cl_close_rpcs_in_flight++;
2312                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2313                                          cli->cl_mod_rpcs_in_flight);
2314                         /* find a free tag */
2315                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2316                                                 max + 1);
2317                         LASSERT(i < OBD_MAX_RIF_MAX);
2318                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2319                         spin_unlock(&cli->cl_mod_rpcs_lock);
2320                         /* tag 0 is reserved for non-modify RPCs */
2321
2322                         CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2323                                "opc %u, max %hu\n",
2324                                cli->cl_import->imp_obd->obd_name,
2325                                i + 1, opc, max);
2326
2327                         return i + 1;
2328                 }
2329                 spin_unlock(&cli->cl_mod_rpcs_lock);
2330
2331                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2332                        "opc %u, max %hu\n",
2333                        cli->cl_import->imp_obd->obd_name, opc, max);
2334
2335                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2336                                           obd_mod_rpc_slot_avail(cli,
2337                                                                  close_req));
2338         } while (true);
2339 }
2340 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2341
2342 /* Put a modify RPC slot from the obd client @cli according
2343  * to the kind of operation @opc that has been sent.
2344  */
2345 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2346 {
2347         bool                    close_req = false;
2348
2349         if (tag == 0)
2350                 return;
2351
2352         if (opc == MDS_CLOSE)
2353                 close_req = true;
2354
2355         spin_lock(&cli->cl_mod_rpcs_lock);
2356         cli->cl_mod_rpcs_in_flight--;
2357         if (close_req)
2358                 cli->cl_close_rpcs_in_flight--;
2359         /* release the tag in the bitmap */
2360         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2361         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2362         spin_unlock(&cli->cl_mod_rpcs_lock);
2363         wake_up(&cli->cl_mod_rpcs_waitq);
2364 }
2365 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2366