Whamcloud - gitweb
LU-9859 ptlrpc: change imp_refcount to refcount_t
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         if (IS_ERR_OR_NULL(symlink)) {
208                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209                 kobject_put(&type->typ_kobj);
210                 return ERR_PTR(rc);
211         }
212         type->typ_debugfs_entry = symlink;
213         type->typ_sym_filter = true;
214
215         if (enable_proc) {
216                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
217                                                       NULL, NULL);
218                 if (IS_ERR(type->typ_procroot)) {
219                         CERROR("%s: can't create compat proc entry: %d\n",
220                                name, (int)PTR_ERR(type->typ_procroot));
221                         type->typ_procroot = NULL;
222                 }
223         }
224
225         return type;
226 }
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
229
230 #define CLASS_MAX_NAME 1024
231
232 int class_register_type(const struct obd_ops *dt_ops,
233                         const struct md_ops *md_ops,
234                         bool enable_proc, struct lprocfs_vars *vars,
235                         const char *name, struct lu_device_type *ldt)
236 {
237         struct obd_type *type;
238         int rc;
239
240         ENTRY;
241         /* sanity check */
242         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
243
244         type = class_search_type(name);
245         if (type) {
246 #ifdef HAVE_SERVER_SUPPORT
247                 if (type->typ_sym_filter)
248                         goto dir_exist;
249 #endif /* HAVE_SERVER_SUPPORT */
250                 kobject_put(&type->typ_kobj);
251                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
252                 RETURN(-EEXIST);
253         }
254
255         OBD_ALLOC(type, sizeof(*type));
256         if (type == NULL)
257                 RETURN(-ENOMEM);
258
259         type->typ_kobj.kset = lustre_kset;
260         kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
262 dir_exist:
263 #endif /* HAVE_SERVER_SUPPORT */
264
265         type->typ_dt_ops = dt_ops;
266         type->typ_md_ops = md_ops;
267
268 #ifdef HAVE_SERVER_SUPPORT
269         if (type->typ_sym_filter) {
270                 type->typ_sym_filter = false;
271                 kobject_put(&type->typ_kobj);
272                 goto setup_ldt;
273         }
274 #endif
275 #ifdef CONFIG_PROC_FS
276         if (enable_proc && !type->typ_procroot) {
277                 type->typ_procroot = lprocfs_register(name,
278                                                       proc_lustre_root,
279                                                       NULL, type);
280                 if (IS_ERR(type->typ_procroot)) {
281                         rc = PTR_ERR(type->typ_procroot);
282                         type->typ_procroot = NULL;
283                         GOTO(failed, rc);
284                 }
285         }
286 #endif
287         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
288                                                     vars, type);
289         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
290                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
291                                              : -ENOMEM;
292                 type->typ_debugfs_entry = NULL;
293                 GOTO(failed, rc);
294         }
295
296         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
297         if (rc)
298                 GOTO(failed, rc);
299 #ifdef HAVE_SERVER_SUPPORT
300 setup_ldt:
301 #endif
302         if (ldt) {
303                 type->typ_lu = ldt;
304                 rc = lu_device_type_init(ldt);
305                 if (rc)
306                         GOTO(failed, rc);
307         }
308
309         RETURN(0);
310
311 failed:
312         kobject_put(&type->typ_kobj);
313
314         RETURN(rc);
315 }
316 EXPORT_SYMBOL(class_register_type);
317
318 int class_unregister_type(const char *name)
319 {
320         struct obd_type *type = class_search_type(name);
321         int rc = 0;
322         ENTRY;
323
324         if (!type) {
325                 CERROR("unknown obd type\n");
326                 RETURN(-EINVAL);
327         }
328
329         if (atomic_read(&type->typ_refcnt)) {
330                 CERROR("type %s has refcount (%d)\n", name,
331                        atomic_read(&type->typ_refcnt));
332                 /* This is a bad situation, let's make the best of it */
333                 /* Remove ops, but leave the name for debugging */
334                 type->typ_dt_ops = NULL;
335                 type->typ_md_ops = NULL;
336                 GOTO(out_put, rc = -EBUSY);
337         }
338
339         /* Put the final ref */
340         kobject_put(&type->typ_kobj);
341 out_put:
342         /* Put the ref returned by class_search_type() */
343         kobject_put(&type->typ_kobj);
344
345         RETURN(rc);
346 } /* class_unregister_type */
347 EXPORT_SYMBOL(class_unregister_type);
348
349 /**
350  * Create a new obd device.
351  *
352  * Allocate the new obd_device and initialize it.
353  *
354  * \param[in] type_name obd device type string.
355  * \param[in] name      obd device name.
356  * \param[in] uuid      obd device UUID
357  *
358  * \retval newdev         pointer to created obd_device
359  * \retval ERR_PTR(errno) on error
360  */
361 struct obd_device *class_newdev(const char *type_name, const char *name,
362                                 const char *uuid)
363 {
364         struct obd_device *newdev;
365         struct obd_type *type = NULL;
366         ENTRY;
367
368         if (strlen(name) >= MAX_OBD_NAME) {
369                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
370                 RETURN(ERR_PTR(-EINVAL));
371         }
372
373         type = class_get_type(type_name);
374         if (type == NULL){
375                 CERROR("OBD: unknown type: %s\n", type_name);
376                 RETURN(ERR_PTR(-ENODEV));
377         }
378
379         newdev = obd_device_alloc();
380         if (newdev == NULL) {
381                 class_put_type(type);
382                 RETURN(ERR_PTR(-ENOMEM));
383         }
384         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
385         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
386         newdev->obd_type = type;
387         newdev->obd_minor = -1;
388
389         rwlock_init(&newdev->obd_pool_lock);
390         newdev->obd_pool_limit = 0;
391         newdev->obd_pool_slv = 0;
392
393         INIT_LIST_HEAD(&newdev->obd_exports);
394         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
395         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
396         INIT_LIST_HEAD(&newdev->obd_exports_timed);
397         INIT_LIST_HEAD(&newdev->obd_nid_stats);
398         spin_lock_init(&newdev->obd_nid_lock);
399         spin_lock_init(&newdev->obd_dev_lock);
400         mutex_init(&newdev->obd_dev_mutex);
401         spin_lock_init(&newdev->obd_osfs_lock);
402         /* newdev->obd_osfs_age must be set to a value in the distant
403          * past to guarantee a fresh statfs is fetched on mount. */
404         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
405
406         /* XXX belongs in setup not attach  */
407         init_rwsem(&newdev->obd_observer_link_sem);
408         /* recovery data */
409         spin_lock_init(&newdev->obd_recovery_task_lock);
410         init_waitqueue_head(&newdev->obd_next_transno_waitq);
411         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
412         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
413         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
414         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
415         INIT_LIST_HEAD(&newdev->obd_evict_list);
416         INIT_LIST_HEAD(&newdev->obd_lwp_list);
417
418         llog_group_init(&newdev->obd_olg);
419         /* Detach drops this */
420         atomic_set(&newdev->obd_refcount, 1);
421         lu_ref_init(&newdev->obd_reference);
422         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
423
424         newdev->obd_conn_inprogress = 0;
425
426         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
427
428         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
429                newdev->obd_name, newdev);
430
431         return newdev;
432 }
433
434 /**
435  * Free obd device.
436  *
437  * \param[in] obd obd_device to be freed
438  *
439  * \retval none
440  */
441 void class_free_dev(struct obd_device *obd)
442 {
443         struct obd_type *obd_type = obd->obd_type;
444
445         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
446                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
448                  "obd %p != obd_devs[%d] %p\n",
449                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
450         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
451                  "obd_refcount should be 0, not %d\n",
452                  atomic_read(&obd->obd_refcount));
453         LASSERT(obd_type != NULL);
454
455         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
456                obd->obd_name, obd->obd_type->typ_name);
457
458         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
459                          obd->obd_name, obd->obd_uuid.uuid);
460         if (obd->obd_stopping) {
461                 int err;
462
463                 /* If we're not stopping, we were never set up */
464                 err = obd_cleanup(obd);
465                 if (err)
466                         CERROR("Cleanup %s returned %d\n",
467                                 obd->obd_name, err);
468         }
469
470         obd_device_free(obd);
471
472         class_put_type(obd_type);
473 }
474
475 /**
476  * Unregister obd device.
477  *
478  * Free slot in obd_dev[] used by \a obd.
479  *
480  * \param[in] new_obd obd_device to be unregistered
481  *
482  * \retval none
483  */
484 void class_unregister_device(struct obd_device *obd)
485 {
486         write_lock(&obd_dev_lock);
487         if (obd->obd_minor >= 0) {
488                 LASSERT(obd_devs[obd->obd_minor] == obd);
489                 obd_devs[obd->obd_minor] = NULL;
490                 obd->obd_minor = -1;
491         }
492         write_unlock(&obd_dev_lock);
493 }
494
495 /**
496  * Register obd device.
497  *
498  * Find free slot in obd_devs[], fills it with \a new_obd.
499  *
500  * \param[in] new_obd obd_device to be registered
501  *
502  * \retval 0          success
503  * \retval -EEXIST    device with this name is registered
504  * \retval -EOVERFLOW obd_devs[] is full
505  */
506 int class_register_device(struct obd_device *new_obd)
507 {
508         int ret = 0;
509         int i;
510         int new_obd_minor = 0;
511         bool minor_assign = false;
512         bool retried = false;
513
514 again:
515         write_lock(&obd_dev_lock);
516         for (i = 0; i < class_devno_max(); i++) {
517                 struct obd_device *obd = class_num2obd(i);
518
519                 if (obd != NULL &&
520                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
521
522                         if (!retried) {
523                                 write_unlock(&obd_dev_lock);
524
525                                 /* the obd_device could be waited to be
526                                  * destroyed by the "obd_zombie_impexp_thread".
527                                  */
528                                 obd_zombie_barrier();
529                                 retried = true;
530                                 goto again;
531                         }
532
533                         CERROR("%s: already exists, won't add\n",
534                                obd->obd_name);
535                         /* in case we found a free slot before duplicate */
536                         minor_assign = false;
537                         ret = -EEXIST;
538                         break;
539                 }
540                 if (!minor_assign && obd == NULL) {
541                         new_obd_minor = i;
542                         minor_assign = true;
543                 }
544         }
545
546         if (minor_assign) {
547                 new_obd->obd_minor = new_obd_minor;
548                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
549                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
550                 obd_devs[new_obd_minor] = new_obd;
551         } else {
552                 if (ret == 0) {
553                         ret = -EOVERFLOW;
554                         CERROR("%s: all %u/%u devices used, increase "
555                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
556                                i, class_devno_max(), ret);
557                 }
558         }
559         write_unlock(&obd_dev_lock);
560
561         RETURN(ret);
562 }
563
564 static int class_name2dev_nolock(const char *name)
565 {
566         int i;
567
568         if (!name)
569                 return -1;
570
571         for (i = 0; i < class_devno_max(); i++) {
572                 struct obd_device *obd = class_num2obd(i);
573
574                 if (obd && strcmp(name, obd->obd_name) == 0) {
575                         /* Make sure we finished attaching before we give
576                            out any references */
577                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
578                         if (obd->obd_attached) {
579                                 return i;
580                         }
581                         break;
582                 }
583         }
584
585         return -1;
586 }
587
588 int class_name2dev(const char *name)
589 {
590         int i;
591
592         if (!name)
593                 return -1;
594
595         read_lock(&obd_dev_lock);
596         i = class_name2dev_nolock(name);
597         read_unlock(&obd_dev_lock);
598
599         return i;
600 }
601 EXPORT_SYMBOL(class_name2dev);
602
603 struct obd_device *class_name2obd(const char *name)
604 {
605         int dev = class_name2dev(name);
606
607         if (dev < 0 || dev > class_devno_max())
608                 return NULL;
609         return class_num2obd(dev);
610 }
611 EXPORT_SYMBOL(class_name2obd);
612
613 int class_uuid2dev_nolock(struct obd_uuid *uuid)
614 {
615         int i;
616
617         for (i = 0; i < class_devno_max(); i++) {
618                 struct obd_device *obd = class_num2obd(i);
619
620                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
621                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
622                         return i;
623                 }
624         }
625
626         return -1;
627 }
628
629 int class_uuid2dev(struct obd_uuid *uuid)
630 {
631         int i;
632
633         read_lock(&obd_dev_lock);
634         i = class_uuid2dev_nolock(uuid);
635         read_unlock(&obd_dev_lock);
636
637         return i;
638 }
639 EXPORT_SYMBOL(class_uuid2dev);
640
641 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
642 {
643         int dev = class_uuid2dev(uuid);
644         if (dev < 0)
645                 return NULL;
646         return class_num2obd(dev);
647 }
648 EXPORT_SYMBOL(class_uuid2obd);
649
650 /**
651  * Get obd device from ::obd_devs[]
652  *
653  * \param num [in] array index
654  *
655  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
656  *         otherwise return the obd device there.
657  */
658 struct obd_device *class_num2obd(int num)
659 {
660         struct obd_device *obd = NULL;
661
662         if (num < class_devno_max()) {
663                 obd = obd_devs[num];
664                 if (obd == NULL)
665                         return NULL;
666
667                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
668                          "%p obd_magic %08x != %08x\n",
669                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
670                 LASSERTF(obd->obd_minor == num,
671                          "%p obd_minor %0d != %0d\n",
672                          obd, obd->obd_minor, num);
673         }
674
675         return obd;
676 }
677
678 /**
679  * Find obd in obd_dev[] by name or uuid.
680  *
681  * Increment obd's refcount if found.
682  *
683  * \param[in] str obd name or uuid
684  *
685  * \retval NULL    if not found
686  * \retval target  pointer to found obd_device
687  */
688 struct obd_device *class_dev_by_str(const char *str)
689 {
690         struct obd_device *target = NULL;
691         struct obd_uuid tgtuuid;
692         int rc;
693
694         obd_str2uuid(&tgtuuid, str);
695
696         read_lock(&obd_dev_lock);
697         rc = class_uuid2dev_nolock(&tgtuuid);
698         if (rc < 0)
699                 rc = class_name2dev_nolock(str);
700
701         if (rc >= 0)
702                 target = class_num2obd(rc);
703
704         if (target != NULL)
705                 class_incref(target, "find", current);
706         read_unlock(&obd_dev_lock);
707
708         RETURN(target);
709 }
710 EXPORT_SYMBOL(class_dev_by_str);
711
712 /**
713  * Get obd devices count. Device in any
714  *    state are counted
715  * \retval obd device count
716  */
717 int get_devices_count(void)
718 {
719         int index, max_index = class_devno_max(), dev_count = 0;
720
721         read_lock(&obd_dev_lock);
722         for (index = 0; index <= max_index; index++) {
723                 struct obd_device *obd = class_num2obd(index);
724                 if (obd != NULL)
725                         dev_count++;
726         }
727         read_unlock(&obd_dev_lock);
728
729         return dev_count;
730 }
731 EXPORT_SYMBOL(get_devices_count);
732
733 void class_obd_list(void)
734 {
735         char *status;
736         int i;
737
738         read_lock(&obd_dev_lock);
739         for (i = 0; i < class_devno_max(); i++) {
740                 struct obd_device *obd = class_num2obd(i);
741
742                 if (obd == NULL)
743                         continue;
744                 if (obd->obd_stopping)
745                         status = "ST";
746                 else if (obd->obd_set_up)
747                         status = "UP";
748                 else if (obd->obd_attached)
749                         status = "AT";
750                 else
751                         status = "--";
752                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
753                          i, status, obd->obd_type->typ_name,
754                          obd->obd_name, obd->obd_uuid.uuid,
755                          atomic_read(&obd->obd_refcount));
756         }
757         read_unlock(&obd_dev_lock);
758 }
759
760 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
761  * specified, then only the client with that uuid is returned,
762  * otherwise any client connected to the tgt is returned.
763  */
764 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
765                                          const char *type_name,
766                                          struct obd_uuid *grp_uuid)
767 {
768         int i;
769
770         read_lock(&obd_dev_lock);
771         for (i = 0; i < class_devno_max(); i++) {
772                 struct obd_device *obd = class_num2obd(i);
773
774                 if (obd == NULL)
775                         continue;
776                 if ((strncmp(obd->obd_type->typ_name, type_name,
777                              strlen(type_name)) == 0)) {
778                         if (obd_uuid_equals(tgt_uuid,
779                                             &obd->u.cli.cl_target_uuid) &&
780                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
781                                                          &obd->obd_uuid) : 1)) {
782                                 read_unlock(&obd_dev_lock);
783                                 return obd;
784                         }
785                 }
786         }
787         read_unlock(&obd_dev_lock);
788
789         return NULL;
790 }
791 EXPORT_SYMBOL(class_find_client_obd);
792
793 /* Iterate the obd_device list looking devices have grp_uuid. Start
794  * searching at *next, and if a device is found, the next index to look
795  * at is saved in *next. If next is NULL, then the first matching device
796  * will always be returned.
797  */
798 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
799 {
800         int i;
801
802         if (next == NULL)
803                 i = 0;
804         else if (*next >= 0 && *next < class_devno_max())
805                 i = *next;
806         else
807                 return NULL;
808
809         read_lock(&obd_dev_lock);
810         for (; i < class_devno_max(); i++) {
811                 struct obd_device *obd = class_num2obd(i);
812
813                 if (obd == NULL)
814                         continue;
815                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
816                         if (next != NULL)
817                                 *next = i+1;
818                         read_unlock(&obd_dev_lock);
819                         return obd;
820                 }
821         }
822         read_unlock(&obd_dev_lock);
823
824         return NULL;
825 }
826 EXPORT_SYMBOL(class_devices_in_group);
827
828 /**
829  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
830  * adjust sptlrpc settings accordingly.
831  */
832 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
833 {
834         struct obd_device  *obd;
835         const char         *type;
836         int                 i, rc = 0, rc2;
837
838         LASSERT(namelen > 0);
839
840         read_lock(&obd_dev_lock);
841         for (i = 0; i < class_devno_max(); i++) {
842                 obd = class_num2obd(i);
843
844                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
845                         continue;
846
847                 /* only notify mdc, osc, osp, lwp, mdt, ost
848                  * because only these have a -sptlrpc llog */
849                 type = obd->obd_type->typ_name;
850                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
851                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
852                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
853                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
854                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
855                     strcmp(type, LUSTRE_OST_NAME) != 0)
856                         continue;
857
858                 if (strncmp(obd->obd_name, fsname, namelen))
859                         continue;
860
861                 class_incref(obd, __FUNCTION__, obd);
862                 read_unlock(&obd_dev_lock);
863                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
864                                          sizeof(KEY_SPTLRPC_CONF),
865                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
866                 rc = rc ? rc : rc2;
867                 class_decref(obd, __FUNCTION__, obd);
868                 read_lock(&obd_dev_lock);
869         }
870         read_unlock(&obd_dev_lock);
871         return rc;
872 }
873 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
874
875 void obd_cleanup_caches(void)
876 {
877         ENTRY;
878         if (obd_device_cachep) {
879                 kmem_cache_destroy(obd_device_cachep);
880                 obd_device_cachep = NULL;
881         }
882
883         EXIT;
884 }
885
886 int obd_init_caches(void)
887 {
888         int rc;
889         ENTRY;
890
891         LASSERT(obd_device_cachep == NULL);
892         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
893                                 sizeof(struct obd_device),
894                                 0, 0, 0, sizeof(struct obd_device), NULL);
895         if (!obd_device_cachep)
896                 GOTO(out, rc = -ENOMEM);
897
898         RETURN(0);
899 out:
900         obd_cleanup_caches();
901         RETURN(rc);
902 }
903
904 static const char export_handle_owner[] = "export";
905
906 /* map connection to client */
907 struct obd_export *class_conn2export(struct lustre_handle *conn)
908 {
909         struct obd_export *export;
910         ENTRY;
911
912         if (!conn) {
913                 CDEBUG(D_CACHE, "looking for null handle\n");
914                 RETURN(NULL);
915         }
916
917         if (conn->cookie == -1) {  /* this means assign a new connection */
918                 CDEBUG(D_CACHE, "want a new connection\n");
919                 RETURN(NULL);
920         }
921
922         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
923         export = class_handle2object(conn->cookie, export_handle_owner);
924         RETURN(export);
925 }
926 EXPORT_SYMBOL(class_conn2export);
927
928 struct obd_device *class_exp2obd(struct obd_export *exp)
929 {
930         if (exp)
931                 return exp->exp_obd;
932         return NULL;
933 }
934 EXPORT_SYMBOL(class_exp2obd);
935
936 struct obd_import *class_exp2cliimp(struct obd_export *exp)
937 {
938         struct obd_device *obd = exp->exp_obd;
939         if (obd == NULL)
940                 return NULL;
941         return obd->u.cli.cl_import;
942 }
943 EXPORT_SYMBOL(class_exp2cliimp);
944
945 /* Export management functions */
946 static void class_export_destroy(struct obd_export *exp)
947 {
948         struct obd_device *obd = exp->exp_obd;
949         ENTRY;
950
951         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
952         LASSERT(obd != NULL);
953
954         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
955                exp->exp_client_uuid.uuid, obd->obd_name);
956
957         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
958         if (exp->exp_connection)
959                 ptlrpc_put_connection_superhack(exp->exp_connection);
960
961         LASSERT(list_empty(&exp->exp_outstanding_replies));
962         LASSERT(list_empty(&exp->exp_uncommitted_replies));
963         LASSERT(list_empty(&exp->exp_req_replay_queue));
964         LASSERT(list_empty(&exp->exp_hp_rpcs));
965         obd_destroy_export(exp);
966         /* self export doesn't hold a reference to an obd, although it
967          * exists until freeing of the obd */
968         if (exp != obd->obd_self_export)
969                 class_decref(obd, "export", exp);
970
971         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
972         kfree_rcu(exp, exp_handle.h_rcu);
973         EXIT;
974 }
975
976 struct obd_export *class_export_get(struct obd_export *exp)
977 {
978         refcount_inc(&exp->exp_handle.h_ref);
979         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
980                refcount_read(&exp->exp_handle.h_ref));
981         return exp;
982 }
983 EXPORT_SYMBOL(class_export_get);
984
985 void class_export_put(struct obd_export *exp)
986 {
987         LASSERT(exp != NULL);
988         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
989         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
990         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
991                refcount_read(&exp->exp_handle.h_ref) - 1);
992
993         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
994                 struct obd_device *obd = exp->exp_obd;
995
996                 CDEBUG(D_IOCTL, "final put %p/%s\n",
997                        exp, exp->exp_client_uuid.uuid);
998
999                 /* release nid stat refererence */
1000                 lprocfs_exp_cleanup(exp);
1001
1002                 if (exp == obd->obd_self_export) {
1003                         /* self export should be destroyed without
1004                          * zombie thread as it doesn't hold a
1005                          * reference to obd and doesn't hold any
1006                          * resources */
1007                         class_export_destroy(exp);
1008                         /* self export is destroyed, no class
1009                          * references exist and it is safe to free
1010                          * obd */
1011                         class_free_dev(obd);
1012                 } else {
1013                         LASSERT(!list_empty(&exp->exp_obd_chain));
1014                         obd_zombie_export_add(exp);
1015                 }
1016
1017         }
1018 }
1019 EXPORT_SYMBOL(class_export_put);
1020
1021 static void obd_zombie_exp_cull(struct work_struct *ws)
1022 {
1023         struct obd_export *export;
1024
1025         export = container_of(ws, struct obd_export, exp_zombie_work);
1026         class_export_destroy(export);
1027 }
1028
1029 /* Creates a new export, adds it to the hash table, and returns a
1030  * pointer to it. The refcount is 2: one for the hash reference, and
1031  * one for the pointer returned by this function. */
1032 struct obd_export *__class_new_export(struct obd_device *obd,
1033                                       struct obd_uuid *cluuid, bool is_self)
1034 {
1035         struct obd_export *export;
1036         int rc = 0;
1037         ENTRY;
1038
1039         OBD_ALLOC_PTR(export);
1040         if (!export)
1041                 return ERR_PTR(-ENOMEM);
1042
1043         export->exp_conn_cnt = 0;
1044         export->exp_lock_hash = NULL;
1045         export->exp_flock_hash = NULL;
1046         /* 2 = class_handle_hash + last */
1047         refcount_set(&export->exp_handle.h_ref, 2);
1048         atomic_set(&export->exp_rpc_count, 0);
1049         atomic_set(&export->exp_cb_count, 0);
1050         atomic_set(&export->exp_locks_count, 0);
1051 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1052         INIT_LIST_HEAD(&export->exp_locks_list);
1053         spin_lock_init(&export->exp_locks_list_guard);
1054 #endif
1055         atomic_set(&export->exp_replay_count, 0);
1056         export->exp_obd = obd;
1057         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1058         spin_lock_init(&export->exp_uncommitted_replies_lock);
1059         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1060         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1061         INIT_HLIST_NODE(&export->exp_handle.h_link);
1062         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1063         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1064         class_handle_hash(&export->exp_handle, export_handle_owner);
1065         export->exp_last_request_time = ktime_get_real_seconds();
1066         spin_lock_init(&export->exp_lock);
1067         spin_lock_init(&export->exp_rpc_lock);
1068         INIT_HLIST_NODE(&export->exp_nid_hash);
1069         INIT_HLIST_NODE(&export->exp_gen_hash);
1070         spin_lock_init(&export->exp_bl_list_lock);
1071         INIT_LIST_HEAD(&export->exp_bl_list);
1072         INIT_LIST_HEAD(&export->exp_stale_list);
1073         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1074
1075         export->exp_sp_peer = LUSTRE_SP_ANY;
1076         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1077         export->exp_client_uuid = *cluuid;
1078         obd_init_export(export);
1079
1080         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1081
1082         spin_lock(&obd->obd_dev_lock);
1083         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1084                 /* shouldn't happen, but might race */
1085                 if (obd->obd_stopping)
1086                         GOTO(exit_unlock, rc = -ENODEV);
1087
1088                 rc = obd_uuid_add(obd, export);
1089                 if (rc != 0) {
1090                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1091                                       obd->obd_name, cluuid->uuid, rc);
1092                         GOTO(exit_unlock, rc = -EALREADY);
1093                 }
1094         }
1095
1096         if (!is_self) {
1097                 class_incref(obd, "export", export);
1098                 list_add_tail(&export->exp_obd_chain_timed,
1099                               &obd->obd_exports_timed);
1100                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1101                 obd->obd_num_exports++;
1102         } else {
1103                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1104                 INIT_LIST_HEAD(&export->exp_obd_chain);
1105         }
1106         spin_unlock(&obd->obd_dev_lock);
1107         RETURN(export);
1108
1109 exit_unlock:
1110         spin_unlock(&obd->obd_dev_lock);
1111         class_handle_unhash(&export->exp_handle);
1112         obd_destroy_export(export);
1113         OBD_FREE_PTR(export);
1114         return ERR_PTR(rc);
1115 }
1116
1117 struct obd_export *class_new_export(struct obd_device *obd,
1118                                     struct obd_uuid *uuid)
1119 {
1120         return __class_new_export(obd, uuid, false);
1121 }
1122 EXPORT_SYMBOL(class_new_export);
1123
1124 struct obd_export *class_new_export_self(struct obd_device *obd,
1125                                          struct obd_uuid *uuid)
1126 {
1127         return __class_new_export(obd, uuid, true);
1128 }
1129
1130 void class_unlink_export(struct obd_export *exp)
1131 {
1132         class_handle_unhash(&exp->exp_handle);
1133
1134         if (exp->exp_obd->obd_self_export == exp) {
1135                 class_export_put(exp);
1136                 return;
1137         }
1138
1139         spin_lock(&exp->exp_obd->obd_dev_lock);
1140         /* delete an uuid-export hashitem from hashtables */
1141         if (exp != exp->exp_obd->obd_self_export)
1142                 obd_uuid_del(exp->exp_obd, exp);
1143
1144 #ifdef HAVE_SERVER_SUPPORT
1145         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1146                 struct tg_export_data   *ted = &exp->exp_target_data;
1147                 struct cfs_hash         *hash;
1148
1149                 /* Because obd_gen_hash will not be released until
1150                  * class_cleanup(), so hash should never be NULL here */
1151                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1152                 LASSERT(hash != NULL);
1153                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1154                              &exp->exp_gen_hash);
1155                 cfs_hash_putref(hash);
1156         }
1157 #endif /* HAVE_SERVER_SUPPORT */
1158
1159         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1160         list_del_init(&exp->exp_obd_chain_timed);
1161         exp->exp_obd->obd_num_exports--;
1162         spin_unlock(&exp->exp_obd->obd_dev_lock);
1163         atomic_inc(&obd_stale_export_num);
1164
1165         /* A reference is kept by obd_stale_exports list */
1166         obd_stale_export_put(exp);
1167 }
1168 EXPORT_SYMBOL(class_unlink_export);
1169
1170 /* Import management functions */
1171 static void obd_zombie_import_free(struct obd_import *imp)
1172 {
1173         ENTRY;
1174
1175         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1176                 imp->imp_obd->obd_name);
1177
1178         LASSERT(refcount_read(&imp->imp_refcount) == 0);
1179
1180         ptlrpc_put_connection_superhack(imp->imp_connection);
1181
1182         while (!list_empty(&imp->imp_conn_list)) {
1183                 struct obd_import_conn *imp_conn;
1184
1185                 imp_conn = list_entry(imp->imp_conn_list.next,
1186                                       struct obd_import_conn, oic_item);
1187                 list_del_init(&imp_conn->oic_item);
1188                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1189                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1190         }
1191
1192         LASSERT(imp->imp_sec == NULL);
1193         class_decref(imp->imp_obd, "import", imp);
1194         OBD_FREE_PTR(imp);
1195         EXIT;
1196 }
1197
1198 struct obd_import *class_import_get(struct obd_import *import)
1199 {
1200         refcount_inc(&import->imp_refcount);
1201         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1202                refcount_read(&import->imp_refcount),
1203                import->imp_obd->obd_name);
1204         return import;
1205 }
1206 EXPORT_SYMBOL(class_import_get);
1207
1208 void class_import_put(struct obd_import *imp)
1209 {
1210         ENTRY;
1211
1212         LASSERT(refcount_read(&imp->imp_refcount) > 0);
1213
1214         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1215                refcount_read(&imp->imp_refcount) - 1,
1216                imp->imp_obd->obd_name);
1217
1218         if (refcount_dec_and_test(&imp->imp_refcount)) {
1219                 CDEBUG(D_INFO, "final put import %p\n", imp);
1220                 obd_zombie_import_add(imp);
1221         }
1222
1223         EXIT;
1224 }
1225 EXPORT_SYMBOL(class_import_put);
1226
1227 static void init_imp_at(struct imp_at *at) {
1228         int i;
1229         at_init(&at->iat_net_latency, 0, 0);
1230         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1231                 /* max service estimates are tracked on the server side, so
1232                    don't use the AT history here, just use the last reported
1233                    val. (But keep hist for proc histogram, worst_ever) */
1234                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1235                         AT_FLG_NOHIST);
1236         }
1237 }
1238
1239 static void obd_zombie_imp_cull(struct work_struct *ws)
1240 {
1241         struct obd_import *import;
1242
1243         import = container_of(ws, struct obd_import, imp_zombie_work);
1244         obd_zombie_import_free(import);
1245 }
1246
1247 struct obd_import *class_new_import(struct obd_device *obd)
1248 {
1249         struct obd_import *imp;
1250         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1251
1252         OBD_ALLOC(imp, sizeof(*imp));
1253         if (imp == NULL)
1254                 return NULL;
1255
1256         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1257         INIT_LIST_HEAD(&imp->imp_replay_list);
1258         INIT_LIST_HEAD(&imp->imp_sending_list);
1259         INIT_LIST_HEAD(&imp->imp_delayed_list);
1260         INIT_LIST_HEAD(&imp->imp_committed_list);
1261         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1262         imp->imp_known_replied_xid = 0;
1263         imp->imp_replay_cursor = &imp->imp_committed_list;
1264         spin_lock_init(&imp->imp_lock);
1265         imp->imp_last_success_conn = 0;
1266         imp->imp_state = LUSTRE_IMP_NEW;
1267         imp->imp_obd = class_incref(obd, "import", imp);
1268         rwlock_init(&imp->imp_sec_lock);
1269         init_waitqueue_head(&imp->imp_recovery_waitq);
1270         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1271
1272         if (curr_pid_ns && curr_pid_ns->child_reaper)
1273                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1274         else
1275                 imp->imp_sec_refpid = 1;
1276
1277         refcount_set(&imp->imp_refcount, 2);
1278         atomic_set(&imp->imp_unregistering, 0);
1279         atomic_set(&imp->imp_inflight, 0);
1280         atomic_set(&imp->imp_replay_inflight, 0);
1281         atomic_set(&imp->imp_inval_count, 0);
1282         INIT_LIST_HEAD(&imp->imp_conn_list);
1283         init_imp_at(&imp->imp_at);
1284
1285         /* the default magic is V2, will be used in connect RPC, and
1286          * then adjusted according to the flags in request/reply. */
1287         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1288
1289         return imp;
1290 }
1291 EXPORT_SYMBOL(class_new_import);
1292
1293 void class_destroy_import(struct obd_import *import)
1294 {
1295         LASSERT(import != NULL);
1296         LASSERT(import != LP_POISON);
1297
1298         spin_lock(&import->imp_lock);
1299         import->imp_generation++;
1300         spin_unlock(&import->imp_lock);
1301         class_import_put(import);
1302 }
1303 EXPORT_SYMBOL(class_destroy_import);
1304
1305 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1306
1307 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1308 {
1309         spin_lock(&exp->exp_locks_list_guard);
1310
1311         LASSERT(lock->l_exp_refs_nr >= 0);
1312
1313         if (lock->l_exp_refs_target != NULL &&
1314             lock->l_exp_refs_target != exp) {
1315                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1316                               exp, lock, lock->l_exp_refs_target);
1317         }
1318         if ((lock->l_exp_refs_nr ++) == 0) {
1319                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1320                 lock->l_exp_refs_target = exp;
1321         }
1322         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1323                lock, exp, lock->l_exp_refs_nr);
1324         spin_unlock(&exp->exp_locks_list_guard);
1325 }
1326 EXPORT_SYMBOL(__class_export_add_lock_ref);
1327
1328 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1329 {
1330         spin_lock(&exp->exp_locks_list_guard);
1331         LASSERT(lock->l_exp_refs_nr > 0);
1332         if (lock->l_exp_refs_target != exp) {
1333                 LCONSOLE_WARN("lock %p, "
1334                               "mismatching export pointers: %p, %p\n",
1335                               lock, lock->l_exp_refs_target, exp);
1336         }
1337         if (-- lock->l_exp_refs_nr == 0) {
1338                 list_del_init(&lock->l_exp_refs_link);
1339                 lock->l_exp_refs_target = NULL;
1340         }
1341         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1342                lock, exp, lock->l_exp_refs_nr);
1343         spin_unlock(&exp->exp_locks_list_guard);
1344 }
1345 EXPORT_SYMBOL(__class_export_del_lock_ref);
1346 #endif
1347
1348 /* A connection defines an export context in which preallocation can
1349    be managed. This releases the export pointer reference, and returns
1350    the export handle, so the export refcount is 1 when this function
1351    returns. */
1352 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1353                   struct obd_uuid *cluuid)
1354 {
1355         struct obd_export *export;
1356         LASSERT(conn != NULL);
1357         LASSERT(obd != NULL);
1358         LASSERT(cluuid != NULL);
1359         ENTRY;
1360
1361         export = class_new_export(obd, cluuid);
1362         if (IS_ERR(export))
1363                 RETURN(PTR_ERR(export));
1364
1365         conn->cookie = export->exp_handle.h_cookie;
1366         class_export_put(export);
1367
1368         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1369                cluuid->uuid, conn->cookie);
1370         RETURN(0);
1371 }
1372 EXPORT_SYMBOL(class_connect);
1373
1374 /* if export is involved in recovery then clean up related things */
1375 static void class_export_recovery_cleanup(struct obd_export *exp)
1376 {
1377         struct obd_device *obd = exp->exp_obd;
1378
1379         spin_lock(&obd->obd_recovery_task_lock);
1380         if (obd->obd_recovering) {
1381                 if (exp->exp_in_recovery) {
1382                         spin_lock(&exp->exp_lock);
1383                         exp->exp_in_recovery = 0;
1384                         spin_unlock(&exp->exp_lock);
1385                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1386                         atomic_dec(&obd->obd_connected_clients);
1387                 }
1388
1389                 /* if called during recovery then should update
1390                  * obd_stale_clients counter,
1391                  * lightweight exports are not counted */
1392                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1393                         exp->exp_obd->obd_stale_clients++;
1394         }
1395         spin_unlock(&obd->obd_recovery_task_lock);
1396
1397         spin_lock(&exp->exp_lock);
1398         /** Cleanup req replay fields */
1399         if (exp->exp_req_replay_needed) {
1400                 exp->exp_req_replay_needed = 0;
1401
1402                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1403                 atomic_dec(&obd->obd_req_replay_clients);
1404         }
1405
1406         /** Cleanup lock replay data */
1407         if (exp->exp_lock_replay_needed) {
1408                 exp->exp_lock_replay_needed = 0;
1409
1410                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1411                 atomic_dec(&obd->obd_lock_replay_clients);
1412         }
1413         spin_unlock(&exp->exp_lock);
1414 }
1415
1416 /* This function removes 1-3 references from the export:
1417  * 1 - for export pointer passed
1418  * and if disconnect really need
1419  * 2 - removing from hash
1420  * 3 - in client_unlink_export
1421  * The export pointer passed to this function can destroyed */
1422 int class_disconnect(struct obd_export *export)
1423 {
1424         int already_disconnected;
1425         ENTRY;
1426
1427         if (export == NULL) {
1428                 CWARN("attempting to free NULL export %p\n", export);
1429                 RETURN(-EINVAL);
1430         }
1431
1432         spin_lock(&export->exp_lock);
1433         already_disconnected = export->exp_disconnected;
1434         export->exp_disconnected = 1;
1435         /*  We hold references of export for uuid hash
1436          *  and nid_hash and export link at least. So
1437          *  it is safe to call cfs_hash_del in there.  */
1438         if (!hlist_unhashed(&export->exp_nid_hash))
1439                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1440                              &export->exp_connection->c_peer.nid,
1441                              &export->exp_nid_hash);
1442         spin_unlock(&export->exp_lock);
1443
1444         /* class_cleanup(), abort_recovery(), and class_fail_export()
1445          * all end up in here, and if any of them race we shouldn't
1446          * call extra class_export_puts(). */
1447         if (already_disconnected) {
1448                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1449                 GOTO(no_disconn, already_disconnected);
1450         }
1451
1452         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1453                export->exp_handle.h_cookie);
1454
1455         class_export_recovery_cleanup(export);
1456         class_unlink_export(export);
1457 no_disconn:
1458         class_export_put(export);
1459         RETURN(0);
1460 }
1461 EXPORT_SYMBOL(class_disconnect);
1462
1463 /* Return non-zero for a fully connected export */
1464 int class_connected_export(struct obd_export *exp)
1465 {
1466         int connected = 0;
1467
1468         if (exp) {
1469                 spin_lock(&exp->exp_lock);
1470                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1471                 spin_unlock(&exp->exp_lock);
1472         }
1473         return connected;
1474 }
1475 EXPORT_SYMBOL(class_connected_export);
1476
1477 static void class_disconnect_export_list(struct list_head *list,
1478                                          enum obd_option flags)
1479 {
1480         int rc;
1481         struct obd_export *exp;
1482         ENTRY;
1483
1484         /* It's possible that an export may disconnect itself, but
1485          * nothing else will be added to this list. */
1486         while (!list_empty(list)) {
1487                 exp = list_entry(list->next, struct obd_export,
1488                                  exp_obd_chain);
1489                 /* need for safe call CDEBUG after obd_disconnect */
1490                 class_export_get(exp);
1491
1492                 spin_lock(&exp->exp_lock);
1493                 exp->exp_flags = flags;
1494                 spin_unlock(&exp->exp_lock);
1495
1496                 if (obd_uuid_equals(&exp->exp_client_uuid,
1497                                     &exp->exp_obd->obd_uuid)) {
1498                         CDEBUG(D_HA,
1499                                "exp %p export uuid == obd uuid, don't discon\n",
1500                                exp);
1501                         /* Need to delete this now so we don't end up pointing
1502                          * to work_list later when this export is cleaned up. */
1503                         list_del_init(&exp->exp_obd_chain);
1504                         class_export_put(exp);
1505                         continue;
1506                 }
1507
1508                 class_export_get(exp);
1509                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1510                        "last request at %lld\n",
1511                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1512                        exp, exp->exp_last_request_time);
1513                 /* release one export reference anyway */
1514                 rc = obd_disconnect(exp);
1515
1516                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1517                        obd_export_nid2str(exp), exp, rc);
1518                 class_export_put(exp);
1519         }
1520         EXIT;
1521 }
1522
1523 void class_disconnect_exports(struct obd_device *obd)
1524 {
1525         LIST_HEAD(work_list);
1526         ENTRY;
1527
1528         /* Move all of the exports from obd_exports to a work list, en masse. */
1529         spin_lock(&obd->obd_dev_lock);
1530         list_splice_init(&obd->obd_exports, &work_list);
1531         list_splice_init(&obd->obd_delayed_exports, &work_list);
1532         spin_unlock(&obd->obd_dev_lock);
1533
1534         if (!list_empty(&work_list)) {
1535                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1536                        "disconnecting them\n", obd->obd_minor, obd);
1537                 class_disconnect_export_list(&work_list,
1538                                              exp_flags_from_obd(obd));
1539         } else
1540                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1541                        obd->obd_minor, obd);
1542         EXIT;
1543 }
1544 EXPORT_SYMBOL(class_disconnect_exports);
1545
1546 /* Remove exports that have not completed recovery.
1547  */
1548 void class_disconnect_stale_exports(struct obd_device *obd,
1549                                     int (*test_export)(struct obd_export *))
1550 {
1551         LIST_HEAD(work_list);
1552         struct obd_export *exp, *n;
1553         int evicted = 0;
1554         ENTRY;
1555
1556         spin_lock(&obd->obd_dev_lock);
1557         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1558                                  exp_obd_chain) {
1559                 /* don't count self-export as client */
1560                 if (obd_uuid_equals(&exp->exp_client_uuid,
1561                                     &exp->exp_obd->obd_uuid))
1562                         continue;
1563
1564                 /* don't evict clients which have no slot in last_rcvd
1565                  * (e.g. lightweight connection) */
1566                 if (exp->exp_target_data.ted_lr_idx == -1)
1567                         continue;
1568
1569                 spin_lock(&exp->exp_lock);
1570                 if (exp->exp_failed || test_export(exp)) {
1571                         spin_unlock(&exp->exp_lock);
1572                         continue;
1573                 }
1574                 exp->exp_failed = 1;
1575                 spin_unlock(&exp->exp_lock);
1576
1577                 list_move(&exp->exp_obd_chain, &work_list);
1578                 evicted++;
1579                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1580                        obd->obd_name, exp->exp_client_uuid.uuid,
1581                        obd_export_nid2str(exp));
1582                 print_export_data(exp, "EVICTING", 0, D_HA);
1583         }
1584         spin_unlock(&obd->obd_dev_lock);
1585
1586         if (evicted)
1587                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1588                               obd->obd_name, evicted);
1589
1590         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1591                                                  OBD_OPT_ABORT_RECOV);
1592         EXIT;
1593 }
1594 EXPORT_SYMBOL(class_disconnect_stale_exports);
1595
1596 void class_fail_export(struct obd_export *exp)
1597 {
1598         int rc, already_failed;
1599
1600         spin_lock(&exp->exp_lock);
1601         already_failed = exp->exp_failed;
1602         exp->exp_failed = 1;
1603         spin_unlock(&exp->exp_lock);
1604
1605         if (already_failed) {
1606                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1607                        exp, exp->exp_client_uuid.uuid);
1608                 return;
1609         }
1610
1611         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1612                exp, exp->exp_client_uuid.uuid);
1613
1614         if (obd_dump_on_timeout)
1615                 libcfs_debug_dumplog();
1616
1617         /* need for safe call CDEBUG after obd_disconnect */
1618         class_export_get(exp);
1619
1620         /* Most callers into obd_disconnect are removing their own reference
1621          * (request, for example) in addition to the one from the hash table.
1622          * We don't have such a reference here, so make one. */
1623         class_export_get(exp);
1624         rc = obd_disconnect(exp);
1625         if (rc)
1626                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1627         else
1628                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1629                        exp, exp->exp_client_uuid.uuid);
1630         class_export_put(exp);
1631 }
1632 EXPORT_SYMBOL(class_fail_export);
1633
1634 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1635 {
1636         struct cfs_hash *nid_hash;
1637         struct obd_export *doomed_exp = NULL;
1638         int exports_evicted = 0;
1639
1640         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1641
1642         spin_lock(&obd->obd_dev_lock);
1643         /* umount has run already, so evict thread should leave
1644          * its task to umount thread now */
1645         if (obd->obd_stopping) {
1646                 spin_unlock(&obd->obd_dev_lock);
1647                 return exports_evicted;
1648         }
1649         nid_hash = obd->obd_nid_hash;
1650         cfs_hash_getref(nid_hash);
1651         spin_unlock(&obd->obd_dev_lock);
1652
1653         do {
1654                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1655                 if (doomed_exp == NULL)
1656                         break;
1657
1658                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1659                          "nid %s found, wanted nid %s, requested nid %s\n",
1660                          obd_export_nid2str(doomed_exp),
1661                          libcfs_nid2str(nid_key), nid);
1662                 LASSERTF(doomed_exp != obd->obd_self_export,
1663                          "self-export is hashed by NID?\n");
1664                 exports_evicted++;
1665                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1666                               "request\n", obd->obd_name,
1667                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1668                               obd_export_nid2str(doomed_exp));
1669                 class_fail_export(doomed_exp);
1670                 class_export_put(doomed_exp);
1671         } while (1);
1672
1673         cfs_hash_putref(nid_hash);
1674
1675         if (!exports_evicted)
1676                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1677                        obd->obd_name, nid);
1678         return exports_evicted;
1679 }
1680 EXPORT_SYMBOL(obd_export_evict_by_nid);
1681
1682 #ifdef HAVE_SERVER_SUPPORT
1683 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1684 {
1685         struct obd_export *doomed_exp = NULL;
1686         struct obd_uuid doomed_uuid;
1687         int exports_evicted = 0;
1688
1689         spin_lock(&obd->obd_dev_lock);
1690         if (obd->obd_stopping) {
1691                 spin_unlock(&obd->obd_dev_lock);
1692                 return exports_evicted;
1693         }
1694         spin_unlock(&obd->obd_dev_lock);
1695
1696         obd_str2uuid(&doomed_uuid, uuid);
1697         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1698                 CERROR("%s: can't evict myself\n", obd->obd_name);
1699                 return exports_evicted;
1700         }
1701
1702         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1703         if (doomed_exp == NULL) {
1704                 CERROR("%s: can't disconnect %s: no exports found\n",
1705                        obd->obd_name, uuid);
1706         } else {
1707                 CWARN("%s: evicting %s at adminstrative request\n",
1708                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1709                 class_fail_export(doomed_exp);
1710                 class_export_put(doomed_exp);
1711                 obd_uuid_del(obd, doomed_exp);
1712                 exports_evicted++;
1713         }
1714
1715         return exports_evicted;
1716 }
1717 #endif /* HAVE_SERVER_SUPPORT */
1718
1719 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1720 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1721 EXPORT_SYMBOL(class_export_dump_hook);
1722 #endif
1723
1724 static void print_export_data(struct obd_export *exp, const char *status,
1725                               int locks, int debug_level)
1726 {
1727         struct ptlrpc_reply_state *rs;
1728         struct ptlrpc_reply_state *first_reply = NULL;
1729         int nreplies = 0;
1730
1731         spin_lock(&exp->exp_lock);
1732         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1733                             rs_exp_list) {
1734                 if (nreplies == 0)
1735                         first_reply = rs;
1736                 nreplies++;
1737         }
1738         spin_unlock(&exp->exp_lock);
1739
1740         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1741                "%p %s %llu stale:%d\n",
1742                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1743                obd_export_nid2str(exp),
1744                refcount_read(&exp->exp_handle.h_ref),
1745                atomic_read(&exp->exp_rpc_count),
1746                atomic_read(&exp->exp_cb_count),
1747                atomic_read(&exp->exp_locks_count),
1748                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1749                nreplies, first_reply, nreplies > 3 ? "..." : "",
1750                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1751 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1752         if (locks && class_export_dump_hook != NULL)
1753                 class_export_dump_hook(exp);
1754 #endif
1755 }
1756
1757 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1758 {
1759         struct obd_export *exp;
1760
1761         spin_lock(&obd->obd_dev_lock);
1762         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1763                 print_export_data(exp, "ACTIVE", locks, debug_level);
1764         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1765                 print_export_data(exp, "UNLINKED", locks, debug_level);
1766         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1767                 print_export_data(exp, "DELAYED", locks, debug_level);
1768         spin_unlock(&obd->obd_dev_lock);
1769 }
1770
1771 void obd_exports_barrier(struct obd_device *obd)
1772 {
1773         int waited = 2;
1774         LASSERT(list_empty(&obd->obd_exports));
1775         spin_lock(&obd->obd_dev_lock);
1776         while (!list_empty(&obd->obd_unlinked_exports)) {
1777                 spin_unlock(&obd->obd_dev_lock);
1778                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1779                 if (waited > 5 && is_power_of_2(waited)) {
1780                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1781                                       "more than %d seconds. "
1782                                       "The obd refcount = %d. Is it stuck?\n",
1783                                       obd->obd_name, waited,
1784                                       atomic_read(&obd->obd_refcount));
1785                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1786                 }
1787                 waited *= 2;
1788                 spin_lock(&obd->obd_dev_lock);
1789         }
1790         spin_unlock(&obd->obd_dev_lock);
1791 }
1792 EXPORT_SYMBOL(obd_exports_barrier);
1793
1794 /**
1795  * Add export to the obd_zombe thread and notify it.
1796  */
1797 static void obd_zombie_export_add(struct obd_export *exp) {
1798         atomic_dec(&obd_stale_export_num);
1799         spin_lock(&exp->exp_obd->obd_dev_lock);
1800         LASSERT(!list_empty(&exp->exp_obd_chain));
1801         list_del_init(&exp->exp_obd_chain);
1802         spin_unlock(&exp->exp_obd->obd_dev_lock);
1803
1804         queue_work(zombie_wq, &exp->exp_zombie_work);
1805 }
1806
1807 /**
1808  * Add import to the obd_zombe thread and notify it.
1809  */
1810 static void obd_zombie_import_add(struct obd_import *imp) {
1811         LASSERT(imp->imp_sec == NULL);
1812
1813         queue_work(zombie_wq, &imp->imp_zombie_work);
1814 }
1815
1816 /**
1817  * wait when obd_zombie import/export queues become empty
1818  */
1819 void obd_zombie_barrier(void)
1820 {
1821         flush_workqueue(zombie_wq);
1822 }
1823 EXPORT_SYMBOL(obd_zombie_barrier);
1824
1825
1826 struct obd_export *obd_stale_export_get(void)
1827 {
1828         struct obd_export *exp = NULL;
1829         ENTRY;
1830
1831         spin_lock(&obd_stale_export_lock);
1832         if (!list_empty(&obd_stale_exports)) {
1833                 exp = list_entry(obd_stale_exports.next,
1834                                  struct obd_export, exp_stale_list);
1835                 list_del_init(&exp->exp_stale_list);
1836         }
1837         spin_unlock(&obd_stale_export_lock);
1838
1839         if (exp) {
1840                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1841                        atomic_read(&obd_stale_export_num));
1842         }
1843         RETURN(exp);
1844 }
1845 EXPORT_SYMBOL(obd_stale_export_get);
1846
1847 void obd_stale_export_put(struct obd_export *exp)
1848 {
1849         ENTRY;
1850
1851         LASSERT(list_empty(&exp->exp_stale_list));
1852         if (exp->exp_lock_hash &&
1853             atomic_read(&exp->exp_lock_hash->hs_count)) {
1854                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1855                        atomic_read(&obd_stale_export_num));
1856
1857                 spin_lock_bh(&exp->exp_bl_list_lock);
1858                 spin_lock(&obd_stale_export_lock);
1859                 /* Add to the tail if there is no blocked locks,
1860                  * to the head otherwise. */
1861                 if (list_empty(&exp->exp_bl_list))
1862                         list_add_tail(&exp->exp_stale_list,
1863                                       &obd_stale_exports);
1864                 else
1865                         list_add(&exp->exp_stale_list,
1866                                  &obd_stale_exports);
1867
1868                 spin_unlock(&obd_stale_export_lock);
1869                 spin_unlock_bh(&exp->exp_bl_list_lock);
1870         } else {
1871                 class_export_put(exp);
1872         }
1873         EXIT;
1874 }
1875 EXPORT_SYMBOL(obd_stale_export_put);
1876
1877 /**
1878  * Adjust the position of the export in the stale list,
1879  * i.e. move to the head of the list if is needed.
1880  **/
1881 void obd_stale_export_adjust(struct obd_export *exp)
1882 {
1883         LASSERT(exp != NULL);
1884         spin_lock_bh(&exp->exp_bl_list_lock);
1885         spin_lock(&obd_stale_export_lock);
1886
1887         if (!list_empty(&exp->exp_stale_list) &&
1888             !list_empty(&exp->exp_bl_list))
1889                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1890
1891         spin_unlock(&obd_stale_export_lock);
1892         spin_unlock_bh(&exp->exp_bl_list_lock);
1893 }
1894 EXPORT_SYMBOL(obd_stale_export_adjust);
1895
1896 /**
1897  * start destroy zombie import/export thread
1898  */
1899 int obd_zombie_impexp_init(void)
1900 {
1901         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1902         if (!zombie_wq)
1903                 return -ENOMEM;
1904
1905         return 0;
1906 }
1907
1908 /**
1909  * stop destroy zombie import/export thread
1910  */
1911 void obd_zombie_impexp_stop(void)
1912 {
1913         destroy_workqueue(zombie_wq);
1914         LASSERT(list_empty(&obd_stale_exports));
1915 }
1916
1917 /***** Kernel-userspace comm helpers *******/
1918
1919 /* Get length of entire message, including header */
1920 int kuc_len(int payload_len)
1921 {
1922         return sizeof(struct kuc_hdr) + payload_len;
1923 }
1924 EXPORT_SYMBOL(kuc_len);
1925
1926 /* Get a pointer to kuc header, given a ptr to the payload
1927  * @param p Pointer to payload area
1928  * @returns Pointer to kuc header
1929  */
1930 struct kuc_hdr * kuc_ptr(void *p)
1931 {
1932         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1933         LASSERT(lh->kuc_magic == KUC_MAGIC);
1934         return lh;
1935 }
1936 EXPORT_SYMBOL(kuc_ptr);
1937
1938 /* Alloc space for a message, and fill in header
1939  * @return Pointer to payload area
1940  */
1941 void *kuc_alloc(int payload_len, int transport, int type)
1942 {
1943         struct kuc_hdr *lh;
1944         int len = kuc_len(payload_len);
1945
1946         OBD_ALLOC(lh, len);
1947         if (lh == NULL)
1948                 return ERR_PTR(-ENOMEM);
1949
1950         lh->kuc_magic = KUC_MAGIC;
1951         lh->kuc_transport = transport;
1952         lh->kuc_msgtype = type;
1953         lh->kuc_msglen = len;
1954
1955         return (void *)(lh + 1);
1956 }
1957 EXPORT_SYMBOL(kuc_alloc);
1958
1959 /* Takes pointer to payload area */
1960 void kuc_free(void *p, int payload_len)
1961 {
1962         struct kuc_hdr *lh = kuc_ptr(p);
1963         OBD_FREE(lh, kuc_len(payload_len));
1964 }
1965 EXPORT_SYMBOL(kuc_free);
1966
1967 struct obd_request_slot_waiter {
1968         struct list_head        orsw_entry;
1969         wait_queue_head_t       orsw_waitq;
1970         bool                    orsw_signaled;
1971 };
1972
1973 static bool obd_request_slot_avail(struct client_obd *cli,
1974                                    struct obd_request_slot_waiter *orsw)
1975 {
1976         bool avail;
1977
1978         spin_lock(&cli->cl_loi_list_lock);
1979         avail = !!list_empty(&orsw->orsw_entry);
1980         spin_unlock(&cli->cl_loi_list_lock);
1981
1982         return avail;
1983 };
1984
1985 /*
1986  * For network flow control, the RPC sponsor needs to acquire a credit
1987  * before sending the RPC. The credits count for a connection is defined
1988  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1989  * the subsequent RPC sponsors need to wait until others released their
1990  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1991  */
1992 int obd_get_request_slot(struct client_obd *cli)
1993 {
1994         struct obd_request_slot_waiter   orsw;
1995         int                              rc;
1996
1997         spin_lock(&cli->cl_loi_list_lock);
1998         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1999                 cli->cl_rpcs_in_flight++;
2000                 spin_unlock(&cli->cl_loi_list_lock);
2001                 return 0;
2002         }
2003
2004         init_waitqueue_head(&orsw.orsw_waitq);
2005         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2006         orsw.orsw_signaled = false;
2007         spin_unlock(&cli->cl_loi_list_lock);
2008
2009         rc = l_wait_event_abortable(orsw.orsw_waitq,
2010                                     obd_request_slot_avail(cli, &orsw) ||
2011                                     orsw.orsw_signaled);
2012
2013         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2014          * freed but other (such as obd_put_request_slot) is using it. */
2015         spin_lock(&cli->cl_loi_list_lock);
2016         if (rc != 0) {
2017                 if (!orsw.orsw_signaled) {
2018                         if (list_empty(&orsw.orsw_entry))
2019                                 cli->cl_rpcs_in_flight--;
2020                         else
2021                                 list_del(&orsw.orsw_entry);
2022                 }
2023                 rc = -EINTR;
2024         }
2025
2026         if (orsw.orsw_signaled) {
2027                 LASSERT(list_empty(&orsw.orsw_entry));
2028
2029                 rc = -EINTR;
2030         }
2031         spin_unlock(&cli->cl_loi_list_lock);
2032
2033         return rc;
2034 }
2035 EXPORT_SYMBOL(obd_get_request_slot);
2036
2037 void obd_put_request_slot(struct client_obd *cli)
2038 {
2039         struct obd_request_slot_waiter *orsw;
2040
2041         spin_lock(&cli->cl_loi_list_lock);
2042         cli->cl_rpcs_in_flight--;
2043
2044         /* If there is free slot, wakeup the first waiter. */
2045         if (!list_empty(&cli->cl_flight_waiters) &&
2046             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2047                 orsw = list_entry(cli->cl_flight_waiters.next,
2048                                   struct obd_request_slot_waiter, orsw_entry);
2049                 list_del_init(&orsw->orsw_entry);
2050                 cli->cl_rpcs_in_flight++;
2051                 wake_up(&orsw->orsw_waitq);
2052         }
2053         spin_unlock(&cli->cl_loi_list_lock);
2054 }
2055 EXPORT_SYMBOL(obd_put_request_slot);
2056
2057 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2058 {
2059         return cli->cl_max_rpcs_in_flight;
2060 }
2061 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2062
2063 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2064 {
2065         struct obd_request_slot_waiter *orsw;
2066         __u32                           old;
2067         int                             diff;
2068         int                             i;
2069         const char *type_name;
2070         int                             rc;
2071
2072         if (max > OBD_MAX_RIF_MAX || max < 1)
2073                 return -ERANGE;
2074
2075         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2076         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2077                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2078                  * strictly lower that max_rpcs_in_flight */
2079                 if (max < 2) {
2080                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2081                                "because it must be higher than "
2082                                "max_mod_rpcs_in_flight value",
2083                                cli->cl_import->imp_obd->obd_name);
2084                         return -ERANGE;
2085                 }
2086                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2087                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2088                         if (rc != 0)
2089                                 return rc;
2090                 }
2091         }
2092
2093         spin_lock(&cli->cl_loi_list_lock);
2094         old = cli->cl_max_rpcs_in_flight;
2095         cli->cl_max_rpcs_in_flight = max;
2096         client_adjust_max_dirty(cli);
2097
2098         diff = max - old;
2099
2100         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2101         for (i = 0; i < diff; i++) {
2102                 if (list_empty(&cli->cl_flight_waiters))
2103                         break;
2104
2105                 orsw = list_entry(cli->cl_flight_waiters.next,
2106                                   struct obd_request_slot_waiter, orsw_entry);
2107                 list_del_init(&orsw->orsw_entry);
2108                 cli->cl_rpcs_in_flight++;
2109                 wake_up(&orsw->orsw_waitq);
2110         }
2111         spin_unlock(&cli->cl_loi_list_lock);
2112
2113         return 0;
2114 }
2115 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2116
2117 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2118 {
2119         return cli->cl_max_mod_rpcs_in_flight;
2120 }
2121 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2122
2123 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2124 {
2125         struct obd_connect_data *ocd;
2126         __u16 maxmodrpcs;
2127         __u16 prev;
2128
2129         if (max > OBD_MAX_RIF_MAX || max < 1)
2130                 return -ERANGE;
2131
2132         /* cannot exceed or equal max_rpcs_in_flight */
2133         if (max >= cli->cl_max_rpcs_in_flight) {
2134                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2135                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2136                        cli->cl_import->imp_obd->obd_name,
2137                        max, cli->cl_max_rpcs_in_flight);
2138                 return -ERANGE;
2139         }
2140
2141         /* cannot exceed max modify RPCs in flight supported by the server */
2142         ocd = &cli->cl_import->imp_connect_data;
2143         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2144                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2145         else
2146                 maxmodrpcs = 1;
2147         if (max > maxmodrpcs) {
2148                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2149                        "higher than max_mod_rpcs_per_client value (%hu) "
2150                        "returned by the server at connection\n",
2151                        cli->cl_import->imp_obd->obd_name,
2152                        max, maxmodrpcs);
2153                 return -ERANGE;
2154         }
2155
2156         spin_lock(&cli->cl_mod_rpcs_lock);
2157
2158         prev = cli->cl_max_mod_rpcs_in_flight;
2159         cli->cl_max_mod_rpcs_in_flight = max;
2160
2161         /* wakeup waiters if limit has been increased */
2162         if (cli->cl_max_mod_rpcs_in_flight > prev)
2163                 wake_up(&cli->cl_mod_rpcs_waitq);
2164
2165         spin_unlock(&cli->cl_mod_rpcs_lock);
2166
2167         return 0;
2168 }
2169 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2170
2171 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2172                                struct seq_file *seq)
2173 {
2174         unsigned long mod_tot = 0, mod_cum;
2175         struct timespec64 now;
2176         int i;
2177
2178         ktime_get_real_ts64(&now);
2179
2180         spin_lock(&cli->cl_mod_rpcs_lock);
2181
2182         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2183                    (s64)now.tv_sec, now.tv_nsec);
2184         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2185                    cli->cl_mod_rpcs_in_flight);
2186
2187         seq_printf(seq, "\n\t\t\tmodify\n");
2188         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2189
2190         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2191
2192         mod_cum = 0;
2193         for (i = 0; i < OBD_HIST_MAX; i++) {
2194                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2195                 mod_cum += mod;
2196                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2197                            i, mod, pct(mod, mod_tot),
2198                            pct(mod_cum, mod_tot));
2199                 if (mod_cum == mod_tot)
2200                         break;
2201         }
2202
2203         spin_unlock(&cli->cl_mod_rpcs_lock);
2204
2205         return 0;
2206 }
2207 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2208
2209 /* The number of modify RPCs sent in parallel is limited
2210  * because the server has a finite number of slots per client to
2211  * store request result and ensure reply reconstruction when needed.
2212  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2213  * that takes into account server limit and cl_max_rpcs_in_flight
2214  * value.
2215  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2216  * one close request is allowed above the maximum.
2217  */
2218 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2219                                                  bool close_req)
2220 {
2221         bool avail;
2222
2223         /* A slot is available if
2224          * - number of modify RPCs in flight is less than the max
2225          * - it's a close RPC and no other close request is in flight
2226          */
2227         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2228                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2229
2230         return avail;
2231 }
2232
2233 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2234                                          bool close_req)
2235 {
2236         bool avail;
2237
2238         spin_lock(&cli->cl_mod_rpcs_lock);
2239         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2240         spin_unlock(&cli->cl_mod_rpcs_lock);
2241         return avail;
2242 }
2243
2244
2245 /* Get a modify RPC slot from the obd client @cli according
2246  * to the kind of operation @opc that is going to be sent
2247  * and the intent @it of the operation if it applies.
2248  * If the maximum number of modify RPCs in flight is reached
2249  * the thread is put to sleep.
2250  * Returns the tag to be set in the request message. Tag 0
2251  * is reserved for non-modifying requests.
2252  */
2253 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2254 {
2255         bool                    close_req = false;
2256         __u16                   i, max;
2257
2258         if (opc == MDS_CLOSE)
2259                 close_req = true;
2260
2261         do {
2262                 spin_lock(&cli->cl_mod_rpcs_lock);
2263                 max = cli->cl_max_mod_rpcs_in_flight;
2264                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2265                         /* there is a slot available */
2266                         cli->cl_mod_rpcs_in_flight++;
2267                         if (close_req)
2268                                 cli->cl_close_rpcs_in_flight++;
2269                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2270                                          cli->cl_mod_rpcs_in_flight);
2271                         /* find a free tag */
2272                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2273                                                 max + 1);
2274                         LASSERT(i < OBD_MAX_RIF_MAX);
2275                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2276                         spin_unlock(&cli->cl_mod_rpcs_lock);
2277                         /* tag 0 is reserved for non-modify RPCs */
2278
2279                         CDEBUG(D_RPCTRACE,
2280                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2281                                cli->cl_import->imp_obd->obd_name,
2282                                i + 1, opc, max);
2283
2284                         return i + 1;
2285                 }
2286                 spin_unlock(&cli->cl_mod_rpcs_lock);
2287
2288                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2289                        "opc %u, max %hu\n",
2290                        cli->cl_import->imp_obd->obd_name, opc, max);
2291
2292                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2293                                           obd_mod_rpc_slot_avail(cli,
2294                                                                  close_req));
2295         } while (true);
2296 }
2297 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2298
2299 /* Put a modify RPC slot from the obd client @cli according
2300  * to the kind of operation @opc that has been sent.
2301  */
2302 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2303 {
2304         bool                    close_req = false;
2305
2306         if (tag == 0)
2307                 return;
2308
2309         if (opc == MDS_CLOSE)
2310                 close_req = true;
2311
2312         spin_lock(&cli->cl_mod_rpcs_lock);
2313         cli->cl_mod_rpcs_in_flight--;
2314         if (close_req)
2315                 cli->cl_close_rpcs_in_flight--;
2316         /* release the tag in the bitmap */
2317         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2318         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2319         spin_unlock(&cli->cl_mod_rpcs_lock);
2320         wake_up(&cli->cl_mod_rpcs_waitq);
2321 }
2322 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2323