Whamcloud - gitweb
aca1fec8ec7bd287e56160135bec6292b2fef300
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         if (IS_ERR_OR_NULL(symlink)) {
208                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209                 kobject_put(&type->typ_kobj);
210                 return ERR_PTR(rc);
211         }
212         type->typ_debugfs_entry = symlink;
213         type->typ_sym_filter = true;
214
215         if (enable_proc) {
216                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
217                                                       NULL, NULL);
218                 if (IS_ERR(type->typ_procroot)) {
219                         CERROR("%s: can't create compat proc entry: %d\n",
220                                name, (int)PTR_ERR(type->typ_procroot));
221                         type->typ_procroot = NULL;
222                 }
223         }
224
225         return type;
226 }
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
229
230 #define CLASS_MAX_NAME 1024
231
232 int class_register_type(const struct obd_ops *dt_ops,
233                         const struct md_ops *md_ops,
234                         bool enable_proc, struct lprocfs_vars *vars,
235                         const char *name, struct lu_device_type *ldt)
236 {
237         struct obd_type *type;
238         int rc;
239
240         ENTRY;
241         /* sanity check */
242         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
243
244         type = class_search_type(name);
245         if (type) {
246 #ifdef HAVE_SERVER_SUPPORT
247                 if (type->typ_sym_filter)
248                         goto dir_exist;
249 #endif /* HAVE_SERVER_SUPPORT */
250                 kobject_put(&type->typ_kobj);
251                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
252                 RETURN(-EEXIST);
253         }
254
255         OBD_ALLOC(type, sizeof(*type));
256         if (type == NULL)
257                 RETURN(-ENOMEM);
258
259         type->typ_kobj.kset = lustre_kset;
260         kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
262 dir_exist:
263 #endif /* HAVE_SERVER_SUPPORT */
264
265         type->typ_dt_ops = dt_ops;
266         type->typ_md_ops = md_ops;
267
268 #ifdef HAVE_SERVER_SUPPORT
269         if (type->typ_sym_filter) {
270                 type->typ_sym_filter = false;
271                 kobject_put(&type->typ_kobj);
272                 goto setup_ldt;
273         }
274 #endif
275 #ifdef CONFIG_PROC_FS
276         if (enable_proc && !type->typ_procroot) {
277                 type->typ_procroot = lprocfs_register(name,
278                                                       proc_lustre_root,
279                                                       NULL, type);
280                 if (IS_ERR(type->typ_procroot)) {
281                         rc = PTR_ERR(type->typ_procroot);
282                         type->typ_procroot = NULL;
283                         GOTO(failed, rc);
284                 }
285         }
286 #endif
287         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
288                                                     vars, type);
289         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
290                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
291                                              : -ENOMEM;
292                 type->typ_debugfs_entry = NULL;
293                 GOTO(failed, rc);
294         }
295
296         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
297         if (rc)
298                 GOTO(failed, rc);
299 #ifdef HAVE_SERVER_SUPPORT
300 setup_ldt:
301 #endif
302         if (ldt) {
303                 type->typ_lu = ldt;
304                 rc = lu_device_type_init(ldt);
305                 if (rc)
306                         GOTO(failed, rc);
307         }
308
309         RETURN(0);
310
311 failed:
312         kobject_put(&type->typ_kobj);
313
314         RETURN(rc);
315 }
316 EXPORT_SYMBOL(class_register_type);
317
318 int class_unregister_type(const char *name)
319 {
320         struct obd_type *type = class_search_type(name);
321         int rc = 0;
322         ENTRY;
323
324         if (!type) {
325                 CERROR("unknown obd type\n");
326                 RETURN(-EINVAL);
327         }
328
329         if (atomic_read(&type->typ_refcnt)) {
330                 CERROR("type %s has refcount (%d)\n", name,
331                        atomic_read(&type->typ_refcnt));
332                 /* This is a bad situation, let's make the best of it */
333                 /* Remove ops, but leave the name for debugging */
334                 type->typ_dt_ops = NULL;
335                 type->typ_md_ops = NULL;
336                 GOTO(out_put, rc = -EBUSY);
337         }
338
339         /* Put the final ref */
340         kobject_put(&type->typ_kobj);
341 out_put:
342         /* Put the ref returned by class_search_type() */
343         kobject_put(&type->typ_kobj);
344
345         RETURN(rc);
346 } /* class_unregister_type */
347 EXPORT_SYMBOL(class_unregister_type);
348
349 /**
350  * Create a new obd device.
351  *
352  * Allocate the new obd_device and initialize it.
353  *
354  * \param[in] type_name obd device type string.
355  * \param[in] name      obd device name.
356  * \param[in] uuid      obd device UUID
357  *
358  * \retval newdev         pointer to created obd_device
359  * \retval ERR_PTR(errno) on error
360  */
361 struct obd_device *class_newdev(const char *type_name, const char *name,
362                                 const char *uuid)
363 {
364         struct obd_device *newdev;
365         struct obd_type *type = NULL;
366         ENTRY;
367
368         if (strlen(name) >= MAX_OBD_NAME) {
369                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
370                 RETURN(ERR_PTR(-EINVAL));
371         }
372
373         type = class_get_type(type_name);
374         if (type == NULL){
375                 CERROR("OBD: unknown type: %s\n", type_name);
376                 RETURN(ERR_PTR(-ENODEV));
377         }
378
379         newdev = obd_device_alloc();
380         if (newdev == NULL) {
381                 class_put_type(type);
382                 RETURN(ERR_PTR(-ENOMEM));
383         }
384         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
385         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
386         newdev->obd_type = type;
387         newdev->obd_minor = -1;
388
389         rwlock_init(&newdev->obd_pool_lock);
390         newdev->obd_pool_limit = 0;
391         newdev->obd_pool_slv = 0;
392
393         INIT_LIST_HEAD(&newdev->obd_exports);
394         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
395         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
396         INIT_LIST_HEAD(&newdev->obd_exports_timed);
397         INIT_LIST_HEAD(&newdev->obd_nid_stats);
398         spin_lock_init(&newdev->obd_nid_lock);
399         spin_lock_init(&newdev->obd_dev_lock);
400         mutex_init(&newdev->obd_dev_mutex);
401         spin_lock_init(&newdev->obd_osfs_lock);
402         /* newdev->obd_osfs_age must be set to a value in the distant
403          * past to guarantee a fresh statfs is fetched on mount. */
404         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
405
406         /* XXX belongs in setup not attach  */
407         init_rwsem(&newdev->obd_observer_link_sem);
408         /* recovery data */
409         spin_lock_init(&newdev->obd_recovery_task_lock);
410         init_waitqueue_head(&newdev->obd_next_transno_waitq);
411         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
412         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
413         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
414         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
415         INIT_LIST_HEAD(&newdev->obd_evict_list);
416         INIT_LIST_HEAD(&newdev->obd_lwp_list);
417
418         llog_group_init(&newdev->obd_olg);
419         /* Detach drops this */
420         atomic_set(&newdev->obd_refcount, 1);
421         lu_ref_init(&newdev->obd_reference);
422         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
423
424         newdev->obd_conn_inprogress = 0;
425
426         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
427
428         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
429                newdev->obd_name, newdev);
430
431         return newdev;
432 }
433
434 /**
435  * Free obd device.
436  *
437  * \param[in] obd obd_device to be freed
438  *
439  * \retval none
440  */
441 void class_free_dev(struct obd_device *obd)
442 {
443         struct obd_type *obd_type = obd->obd_type;
444
445         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
446                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
448                  "obd %p != obd_devs[%d] %p\n",
449                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
450         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
451                  "obd_refcount should be 0, not %d\n",
452                  atomic_read(&obd->obd_refcount));
453         LASSERT(obd_type != NULL);
454
455         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
456                obd->obd_name, obd->obd_type->typ_name);
457
458         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
459                          obd->obd_name, obd->obd_uuid.uuid);
460         if (obd->obd_stopping) {
461                 int err;
462
463                 /* If we're not stopping, we were never set up */
464                 err = obd_cleanup(obd);
465                 if (err)
466                         CERROR("Cleanup %s returned %d\n",
467                                 obd->obd_name, err);
468         }
469
470         obd_device_free(obd);
471
472         class_put_type(obd_type);
473 }
474
475 /**
476  * Unregister obd device.
477  *
478  * Free slot in obd_dev[] used by \a obd.
479  *
480  * \param[in] new_obd obd_device to be unregistered
481  *
482  * \retval none
483  */
484 void class_unregister_device(struct obd_device *obd)
485 {
486         write_lock(&obd_dev_lock);
487         if (obd->obd_minor >= 0) {
488                 LASSERT(obd_devs[obd->obd_minor] == obd);
489                 obd_devs[obd->obd_minor] = NULL;
490                 obd->obd_minor = -1;
491         }
492         write_unlock(&obd_dev_lock);
493 }
494
495 /**
496  * Register obd device.
497  *
498  * Find free slot in obd_devs[], fills it with \a new_obd.
499  *
500  * \param[in] new_obd obd_device to be registered
501  *
502  * \retval 0          success
503  * \retval -EEXIST    device with this name is registered
504  * \retval -EOVERFLOW obd_devs[] is full
505  */
506 int class_register_device(struct obd_device *new_obd)
507 {
508         int ret = 0;
509         int i;
510         int new_obd_minor = 0;
511         bool minor_assign = false;
512         bool retried = false;
513
514 again:
515         write_lock(&obd_dev_lock);
516         for (i = 0; i < class_devno_max(); i++) {
517                 struct obd_device *obd = class_num2obd(i);
518
519                 if (obd != NULL &&
520                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
521
522                         if (!retried) {
523                                 write_unlock(&obd_dev_lock);
524
525                                 /* the obd_device could be waited to be
526                                  * destroyed by the "obd_zombie_impexp_thread".
527                                  */
528                                 obd_zombie_barrier();
529                                 retried = true;
530                                 goto again;
531                         }
532
533                         CERROR("%s: already exists, won't add\n",
534                                obd->obd_name);
535                         /* in case we found a free slot before duplicate */
536                         minor_assign = false;
537                         ret = -EEXIST;
538                         break;
539                 }
540                 if (!minor_assign && obd == NULL) {
541                         new_obd_minor = i;
542                         minor_assign = true;
543                 }
544         }
545
546         if (minor_assign) {
547                 new_obd->obd_minor = new_obd_minor;
548                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
549                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
550                 obd_devs[new_obd_minor] = new_obd;
551         } else {
552                 if (ret == 0) {
553                         ret = -EOVERFLOW;
554                         CERROR("%s: all %u/%u devices used, increase "
555                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
556                                i, class_devno_max(), ret);
557                 }
558         }
559         write_unlock(&obd_dev_lock);
560
561         RETURN(ret);
562 }
563
564 static int class_name2dev_nolock(const char *name)
565 {
566         int i;
567
568         if (!name)
569                 return -1;
570
571         for (i = 0; i < class_devno_max(); i++) {
572                 struct obd_device *obd = class_num2obd(i);
573
574                 if (obd && strcmp(name, obd->obd_name) == 0) {
575                         /* Make sure we finished attaching before we give
576                            out any references */
577                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
578                         if (obd->obd_attached) {
579                                 return i;
580                         }
581                         break;
582                 }
583         }
584
585         return -1;
586 }
587
588 int class_name2dev(const char *name)
589 {
590         int i;
591
592         if (!name)
593                 return -1;
594
595         read_lock(&obd_dev_lock);
596         i = class_name2dev_nolock(name);
597         read_unlock(&obd_dev_lock);
598
599         return i;
600 }
601 EXPORT_SYMBOL(class_name2dev);
602
603 struct obd_device *class_name2obd(const char *name)
604 {
605         int dev = class_name2dev(name);
606
607         if (dev < 0 || dev > class_devno_max())
608                 return NULL;
609         return class_num2obd(dev);
610 }
611 EXPORT_SYMBOL(class_name2obd);
612
613 int class_uuid2dev_nolock(struct obd_uuid *uuid)
614 {
615         int i;
616
617         for (i = 0; i < class_devno_max(); i++) {
618                 struct obd_device *obd = class_num2obd(i);
619
620                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
621                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
622                         return i;
623                 }
624         }
625
626         return -1;
627 }
628
629 int class_uuid2dev(struct obd_uuid *uuid)
630 {
631         int i;
632
633         read_lock(&obd_dev_lock);
634         i = class_uuid2dev_nolock(uuid);
635         read_unlock(&obd_dev_lock);
636
637         return i;
638 }
639 EXPORT_SYMBOL(class_uuid2dev);
640
641 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
642 {
643         int dev = class_uuid2dev(uuid);
644         if (dev < 0)
645                 return NULL;
646         return class_num2obd(dev);
647 }
648 EXPORT_SYMBOL(class_uuid2obd);
649
650 /**
651  * Get obd device from ::obd_devs[]
652  *
653  * \param num [in] array index
654  *
655  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
656  *         otherwise return the obd device there.
657  */
658 struct obd_device *class_num2obd(int num)
659 {
660         struct obd_device *obd = NULL;
661
662         if (num < class_devno_max()) {
663                 obd = obd_devs[num];
664                 if (obd == NULL)
665                         return NULL;
666
667                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
668                          "%p obd_magic %08x != %08x\n",
669                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
670                 LASSERTF(obd->obd_minor == num,
671                          "%p obd_minor %0d != %0d\n",
672                          obd, obd->obd_minor, num);
673         }
674
675         return obd;
676 }
677
678 /**
679  * Find obd in obd_dev[] by name or uuid.
680  *
681  * Increment obd's refcount if found.
682  *
683  * \param[in] str obd name or uuid
684  *
685  * \retval NULL    if not found
686  * \retval target  pointer to found obd_device
687  */
688 struct obd_device *class_dev_by_str(const char *str)
689 {
690         struct obd_device *target = NULL;
691         struct obd_uuid tgtuuid;
692         int rc;
693
694         obd_str2uuid(&tgtuuid, str);
695
696         read_lock(&obd_dev_lock);
697         rc = class_uuid2dev_nolock(&tgtuuid);
698         if (rc < 0)
699                 rc = class_name2dev_nolock(str);
700
701         if (rc >= 0)
702                 target = class_num2obd(rc);
703
704         if (target != NULL)
705                 class_incref(target, "find", current);
706         read_unlock(&obd_dev_lock);
707
708         RETURN(target);
709 }
710 EXPORT_SYMBOL(class_dev_by_str);
711
712 /**
713  * Get obd devices count. Device in any
714  *    state are counted
715  * \retval obd device count
716  */
717 int get_devices_count(void)
718 {
719         int index, max_index = class_devno_max(), dev_count = 0;
720
721         read_lock(&obd_dev_lock);
722         for (index = 0; index <= max_index; index++) {
723                 struct obd_device *obd = class_num2obd(index);
724                 if (obd != NULL)
725                         dev_count++;
726         }
727         read_unlock(&obd_dev_lock);
728
729         return dev_count;
730 }
731 EXPORT_SYMBOL(get_devices_count);
732
733 void class_obd_list(void)
734 {
735         char *status;
736         int i;
737
738         read_lock(&obd_dev_lock);
739         for (i = 0; i < class_devno_max(); i++) {
740                 struct obd_device *obd = class_num2obd(i);
741
742                 if (obd == NULL)
743                         continue;
744                 if (obd->obd_stopping)
745                         status = "ST";
746                 else if (obd->obd_set_up)
747                         status = "UP";
748                 else if (obd->obd_attached)
749                         status = "AT";
750                 else
751                         status = "--";
752                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
753                          i, status, obd->obd_type->typ_name,
754                          obd->obd_name, obd->obd_uuid.uuid,
755                          atomic_read(&obd->obd_refcount));
756         }
757         read_unlock(&obd_dev_lock);
758 }
759
760 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
761    specified, then only the client with that uuid is returned,
762    otherwise any client connected to the tgt is returned. */
763 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
764                                           const char *type_name,
765                                           struct obd_uuid *grp_uuid)
766 {
767         int i;
768
769         read_lock(&obd_dev_lock);
770         for (i = 0; i < class_devno_max(); i++) {
771                 struct obd_device *obd = class_num2obd(i);
772
773                 if (obd == NULL)
774                         continue;
775                 if ((strncmp(obd->obd_type->typ_name, type_name,
776                              strlen(type_name)) == 0)) {
777                         if (obd_uuid_equals(tgt_uuid,
778                                             &obd->u.cli.cl_target_uuid) &&
779                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
780                                                          &obd->obd_uuid) : 1)) {
781                                 read_unlock(&obd_dev_lock);
782                                 return obd;
783                         }
784                 }
785         }
786         read_unlock(&obd_dev_lock);
787
788         return NULL;
789 }
790 EXPORT_SYMBOL(class_find_client_obd);
791
792 /* Iterate the obd_device list looking devices have grp_uuid. Start
793    searching at *next, and if a device is found, the next index to look
794    at is saved in *next. If next is NULL, then the first matching device
795    will always be returned. */
796 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
797 {
798         int i;
799
800         if (next == NULL)
801                 i = 0;
802         else if (*next >= 0 && *next < class_devno_max())
803                 i = *next;
804         else
805                 return NULL;
806
807         read_lock(&obd_dev_lock);
808         for (; i < class_devno_max(); i++) {
809                 struct obd_device *obd = class_num2obd(i);
810
811                 if (obd == NULL)
812                         continue;
813                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
814                         if (next != NULL)
815                                 *next = i+1;
816                         read_unlock(&obd_dev_lock);
817                         return obd;
818                 }
819         }
820         read_unlock(&obd_dev_lock);
821
822         return NULL;
823 }
824 EXPORT_SYMBOL(class_devices_in_group);
825
826 /**
827  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
828  * adjust sptlrpc settings accordingly.
829  */
830 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
831 {
832         struct obd_device  *obd;
833         const char         *type;
834         int                 i, rc = 0, rc2;
835
836         LASSERT(namelen > 0);
837
838         read_lock(&obd_dev_lock);
839         for (i = 0; i < class_devno_max(); i++) {
840                 obd = class_num2obd(i);
841
842                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
843                         continue;
844
845                 /* only notify mdc, osc, osp, lwp, mdt, ost
846                  * because only these have a -sptlrpc llog */
847                 type = obd->obd_type->typ_name;
848                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
849                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
850                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
851                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
852                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
853                     strcmp(type, LUSTRE_OST_NAME) != 0)
854                         continue;
855
856                 if (strncmp(obd->obd_name, fsname, namelen))
857                         continue;
858
859                 class_incref(obd, __FUNCTION__, obd);
860                 read_unlock(&obd_dev_lock);
861                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
862                                          sizeof(KEY_SPTLRPC_CONF),
863                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
864                 rc = rc ? rc : rc2;
865                 class_decref(obd, __FUNCTION__, obd);
866                 read_lock(&obd_dev_lock);
867         }
868         read_unlock(&obd_dev_lock);
869         return rc;
870 }
871 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
872
873 void obd_cleanup_caches(void)
874 {
875         ENTRY;
876         if (obd_device_cachep) {
877                 kmem_cache_destroy(obd_device_cachep);
878                 obd_device_cachep = NULL;
879         }
880
881         EXIT;
882 }
883
884 int obd_init_caches(void)
885 {
886         int rc;
887         ENTRY;
888
889         LASSERT(obd_device_cachep == NULL);
890         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
891                                 sizeof(struct obd_device),
892                                 0, 0, 0, sizeof(struct obd_device), NULL);
893         if (!obd_device_cachep)
894                 GOTO(out, rc = -ENOMEM);
895
896         RETURN(0);
897 out:
898         obd_cleanup_caches();
899         RETURN(rc);
900 }
901
902 static const char export_handle_owner[] = "export";
903
904 /* map connection to client */
905 struct obd_export *class_conn2export(struct lustre_handle *conn)
906 {
907         struct obd_export *export;
908         ENTRY;
909
910         if (!conn) {
911                 CDEBUG(D_CACHE, "looking for null handle\n");
912                 RETURN(NULL);
913         }
914
915         if (conn->cookie == -1) {  /* this means assign a new connection */
916                 CDEBUG(D_CACHE, "want a new connection\n");
917                 RETURN(NULL);
918         }
919
920         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
921         export = class_handle2object(conn->cookie, export_handle_owner);
922         RETURN(export);
923 }
924 EXPORT_SYMBOL(class_conn2export);
925
926 struct obd_device *class_exp2obd(struct obd_export *exp)
927 {
928         if (exp)
929                 return exp->exp_obd;
930         return NULL;
931 }
932 EXPORT_SYMBOL(class_exp2obd);
933
934 struct obd_import *class_exp2cliimp(struct obd_export *exp)
935 {
936         struct obd_device *obd = exp->exp_obd;
937         if (obd == NULL)
938                 return NULL;
939         return obd->u.cli.cl_import;
940 }
941 EXPORT_SYMBOL(class_exp2cliimp);
942
943 /* Export management functions */
944 static void class_export_destroy(struct obd_export *exp)
945 {
946         struct obd_device *obd = exp->exp_obd;
947         ENTRY;
948
949         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
950         LASSERT(obd != NULL);
951
952         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
953                exp->exp_client_uuid.uuid, obd->obd_name);
954
955         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
956         if (exp->exp_connection)
957                 ptlrpc_put_connection_superhack(exp->exp_connection);
958
959         LASSERT(list_empty(&exp->exp_outstanding_replies));
960         LASSERT(list_empty(&exp->exp_uncommitted_replies));
961         LASSERT(list_empty(&exp->exp_req_replay_queue));
962         LASSERT(list_empty(&exp->exp_hp_rpcs));
963         obd_destroy_export(exp);
964         /* self export doesn't hold a reference to an obd, although it
965          * exists until freeing of the obd */
966         if (exp != obd->obd_self_export)
967                 class_decref(obd, "export", exp);
968
969         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
970         kfree_rcu(exp, exp_handle.h_rcu);
971         EXIT;
972 }
973
974 struct obd_export *class_export_get(struct obd_export *exp)
975 {
976         refcount_inc(&exp->exp_handle.h_ref);
977         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
978                refcount_read(&exp->exp_handle.h_ref));
979         return exp;
980 }
981 EXPORT_SYMBOL(class_export_get);
982
983 void class_export_put(struct obd_export *exp)
984 {
985         LASSERT(exp != NULL);
986         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
987         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
988         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
989                refcount_read(&exp->exp_handle.h_ref) - 1);
990
991         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
992                 struct obd_device *obd = exp->exp_obd;
993
994                 CDEBUG(D_IOCTL, "final put %p/%s\n",
995                        exp, exp->exp_client_uuid.uuid);
996
997                 /* release nid stat refererence */
998                 lprocfs_exp_cleanup(exp);
999
1000                 if (exp == obd->obd_self_export) {
1001                         /* self export should be destroyed without
1002                          * zombie thread as it doesn't hold a
1003                          * reference to obd and doesn't hold any
1004                          * resources */
1005                         class_export_destroy(exp);
1006                         /* self export is destroyed, no class
1007                          * references exist and it is safe to free
1008                          * obd */
1009                         class_free_dev(obd);
1010                 } else {
1011                         LASSERT(!list_empty(&exp->exp_obd_chain));
1012                         obd_zombie_export_add(exp);
1013                 }
1014
1015         }
1016 }
1017 EXPORT_SYMBOL(class_export_put);
1018
1019 static void obd_zombie_exp_cull(struct work_struct *ws)
1020 {
1021         struct obd_export *export;
1022
1023         export = container_of(ws, struct obd_export, exp_zombie_work);
1024         class_export_destroy(export);
1025 }
1026
1027 /* Creates a new export, adds it to the hash table, and returns a
1028  * pointer to it. The refcount is 2: one for the hash reference, and
1029  * one for the pointer returned by this function. */
1030 struct obd_export *__class_new_export(struct obd_device *obd,
1031                                       struct obd_uuid *cluuid, bool is_self)
1032 {
1033         struct obd_export *export;
1034         int rc = 0;
1035         ENTRY;
1036
1037         OBD_ALLOC_PTR(export);
1038         if (!export)
1039                 return ERR_PTR(-ENOMEM);
1040
1041         export->exp_conn_cnt = 0;
1042         export->exp_lock_hash = NULL;
1043         export->exp_flock_hash = NULL;
1044         /* 2 = class_handle_hash + last */
1045         refcount_set(&export->exp_handle.h_ref, 2);
1046         atomic_set(&export->exp_rpc_count, 0);
1047         atomic_set(&export->exp_cb_count, 0);
1048         atomic_set(&export->exp_locks_count, 0);
1049 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1050         INIT_LIST_HEAD(&export->exp_locks_list);
1051         spin_lock_init(&export->exp_locks_list_guard);
1052 #endif
1053         atomic_set(&export->exp_replay_count, 0);
1054         export->exp_obd = obd;
1055         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1056         spin_lock_init(&export->exp_uncommitted_replies_lock);
1057         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1058         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1059         INIT_HLIST_NODE(&export->exp_handle.h_link);
1060         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1061         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1062         class_handle_hash(&export->exp_handle, export_handle_owner);
1063         export->exp_last_request_time = ktime_get_real_seconds();
1064         spin_lock_init(&export->exp_lock);
1065         spin_lock_init(&export->exp_rpc_lock);
1066         INIT_HLIST_NODE(&export->exp_nid_hash);
1067         INIT_HLIST_NODE(&export->exp_gen_hash);
1068         spin_lock_init(&export->exp_bl_list_lock);
1069         INIT_LIST_HEAD(&export->exp_bl_list);
1070         INIT_LIST_HEAD(&export->exp_stale_list);
1071         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1072
1073         export->exp_sp_peer = LUSTRE_SP_ANY;
1074         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1075         export->exp_client_uuid = *cluuid;
1076         obd_init_export(export);
1077
1078         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1079
1080         spin_lock(&obd->obd_dev_lock);
1081         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1082                 /* shouldn't happen, but might race */
1083                 if (obd->obd_stopping)
1084                         GOTO(exit_unlock, rc = -ENODEV);
1085
1086                 rc = obd_uuid_add(obd, export);
1087                 if (rc != 0) {
1088                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1089                                       obd->obd_name, cluuid->uuid, rc);
1090                         GOTO(exit_unlock, rc = -EALREADY);
1091                 }
1092         }
1093
1094         if (!is_self) {
1095                 class_incref(obd, "export", export);
1096                 list_add_tail(&export->exp_obd_chain_timed,
1097                               &obd->obd_exports_timed);
1098                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1099                 obd->obd_num_exports++;
1100         } else {
1101                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1102                 INIT_LIST_HEAD(&export->exp_obd_chain);
1103         }
1104         spin_unlock(&obd->obd_dev_lock);
1105         RETURN(export);
1106
1107 exit_unlock:
1108         spin_unlock(&obd->obd_dev_lock);
1109         class_handle_unhash(&export->exp_handle);
1110         obd_destroy_export(export);
1111         OBD_FREE_PTR(export);
1112         return ERR_PTR(rc);
1113 }
1114
1115 struct obd_export *class_new_export(struct obd_device *obd,
1116                                     struct obd_uuid *uuid)
1117 {
1118         return __class_new_export(obd, uuid, false);
1119 }
1120 EXPORT_SYMBOL(class_new_export);
1121
1122 struct obd_export *class_new_export_self(struct obd_device *obd,
1123                                          struct obd_uuid *uuid)
1124 {
1125         return __class_new_export(obd, uuid, true);
1126 }
1127
1128 void class_unlink_export(struct obd_export *exp)
1129 {
1130         class_handle_unhash(&exp->exp_handle);
1131
1132         if (exp->exp_obd->obd_self_export == exp) {
1133                 class_export_put(exp);
1134                 return;
1135         }
1136
1137         spin_lock(&exp->exp_obd->obd_dev_lock);
1138         /* delete an uuid-export hashitem from hashtables */
1139         if (exp != exp->exp_obd->obd_self_export)
1140                 obd_uuid_del(exp->exp_obd, exp);
1141
1142 #ifdef HAVE_SERVER_SUPPORT
1143         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1144                 struct tg_export_data   *ted = &exp->exp_target_data;
1145                 struct cfs_hash         *hash;
1146
1147                 /* Because obd_gen_hash will not be released until
1148                  * class_cleanup(), so hash should never be NULL here */
1149                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1150                 LASSERT(hash != NULL);
1151                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1152                              &exp->exp_gen_hash);
1153                 cfs_hash_putref(hash);
1154         }
1155 #endif /* HAVE_SERVER_SUPPORT */
1156
1157         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1158         list_del_init(&exp->exp_obd_chain_timed);
1159         exp->exp_obd->obd_num_exports--;
1160         spin_unlock(&exp->exp_obd->obd_dev_lock);
1161         atomic_inc(&obd_stale_export_num);
1162
1163         /* A reference is kept by obd_stale_exports list */
1164         obd_stale_export_put(exp);
1165 }
1166 EXPORT_SYMBOL(class_unlink_export);
1167
1168 /* Import management functions */
1169 static void obd_zombie_import_free(struct obd_import *imp)
1170 {
1171         ENTRY;
1172
1173         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1174                 imp->imp_obd->obd_name);
1175
1176         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1177
1178         ptlrpc_put_connection_superhack(imp->imp_connection);
1179
1180         while (!list_empty(&imp->imp_conn_list)) {
1181                 struct obd_import_conn *imp_conn;
1182
1183                 imp_conn = list_entry(imp->imp_conn_list.next,
1184                                       struct obd_import_conn, oic_item);
1185                 list_del_init(&imp_conn->oic_item);
1186                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1187                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1188         }
1189
1190         LASSERT(imp->imp_sec == NULL);
1191         class_decref(imp->imp_obd, "import", imp);
1192         OBD_FREE_PTR(imp);
1193         EXIT;
1194 }
1195
1196 struct obd_import *class_import_get(struct obd_import *import)
1197 {
1198         atomic_inc(&import->imp_refcount);
1199         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1200                atomic_read(&import->imp_refcount),
1201                import->imp_obd->obd_name);
1202         return import;
1203 }
1204 EXPORT_SYMBOL(class_import_get);
1205
1206 void class_import_put(struct obd_import *imp)
1207 {
1208         ENTRY;
1209
1210         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1211
1212         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1213                atomic_read(&imp->imp_refcount) - 1,
1214                imp->imp_obd->obd_name);
1215
1216         if (atomic_dec_and_test(&imp->imp_refcount)) {
1217                 CDEBUG(D_INFO, "final put import %p\n", imp);
1218                 obd_zombie_import_add(imp);
1219         }
1220
1221         EXIT;
1222 }
1223 EXPORT_SYMBOL(class_import_put);
1224
1225 static void init_imp_at(struct imp_at *at) {
1226         int i;
1227         at_init(&at->iat_net_latency, 0, 0);
1228         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1229                 /* max service estimates are tracked on the server side, so
1230                    don't use the AT history here, just use the last reported
1231                    val. (But keep hist for proc histogram, worst_ever) */
1232                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1233                         AT_FLG_NOHIST);
1234         }
1235 }
1236
1237 static void obd_zombie_imp_cull(struct work_struct *ws)
1238 {
1239         struct obd_import *import;
1240
1241         import = container_of(ws, struct obd_import, imp_zombie_work);
1242         obd_zombie_import_free(import);
1243 }
1244
1245 struct obd_import *class_new_import(struct obd_device *obd)
1246 {
1247         struct obd_import *imp;
1248         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1249
1250         OBD_ALLOC(imp, sizeof(*imp));
1251         if (imp == NULL)
1252                 return NULL;
1253
1254         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1255         INIT_LIST_HEAD(&imp->imp_replay_list);
1256         INIT_LIST_HEAD(&imp->imp_sending_list);
1257         INIT_LIST_HEAD(&imp->imp_delayed_list);
1258         INIT_LIST_HEAD(&imp->imp_committed_list);
1259         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1260         imp->imp_known_replied_xid = 0;
1261         imp->imp_replay_cursor = &imp->imp_committed_list;
1262         spin_lock_init(&imp->imp_lock);
1263         imp->imp_last_success_conn = 0;
1264         imp->imp_state = LUSTRE_IMP_NEW;
1265         imp->imp_obd = class_incref(obd, "import", imp);
1266         rwlock_init(&imp->imp_sec_lock);
1267         init_waitqueue_head(&imp->imp_recovery_waitq);
1268         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1269
1270         if (curr_pid_ns && curr_pid_ns->child_reaper)
1271                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1272         else
1273                 imp->imp_sec_refpid = 1;
1274
1275         atomic_set(&imp->imp_refcount, 2);
1276         atomic_set(&imp->imp_unregistering, 0);
1277         atomic_set(&imp->imp_inflight, 0);
1278         atomic_set(&imp->imp_replay_inflight, 0);
1279         atomic_set(&imp->imp_inval_count, 0);
1280         INIT_LIST_HEAD(&imp->imp_conn_list);
1281         init_imp_at(&imp->imp_at);
1282
1283         /* the default magic is V2, will be used in connect RPC, and
1284          * then adjusted according to the flags in request/reply. */
1285         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1286
1287         return imp;
1288 }
1289 EXPORT_SYMBOL(class_new_import);
1290
1291 void class_destroy_import(struct obd_import *import)
1292 {
1293         LASSERT(import != NULL);
1294         LASSERT(import != LP_POISON);
1295
1296         spin_lock(&import->imp_lock);
1297         import->imp_generation++;
1298         spin_unlock(&import->imp_lock);
1299         class_import_put(import);
1300 }
1301 EXPORT_SYMBOL(class_destroy_import);
1302
1303 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1304
1305 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1306 {
1307         spin_lock(&exp->exp_locks_list_guard);
1308
1309         LASSERT(lock->l_exp_refs_nr >= 0);
1310
1311         if (lock->l_exp_refs_target != NULL &&
1312             lock->l_exp_refs_target != exp) {
1313                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1314                               exp, lock, lock->l_exp_refs_target);
1315         }
1316         if ((lock->l_exp_refs_nr ++) == 0) {
1317                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1318                 lock->l_exp_refs_target = exp;
1319         }
1320         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1321                lock, exp, lock->l_exp_refs_nr);
1322         spin_unlock(&exp->exp_locks_list_guard);
1323 }
1324 EXPORT_SYMBOL(__class_export_add_lock_ref);
1325
1326 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1327 {
1328         spin_lock(&exp->exp_locks_list_guard);
1329         LASSERT(lock->l_exp_refs_nr > 0);
1330         if (lock->l_exp_refs_target != exp) {
1331                 LCONSOLE_WARN("lock %p, "
1332                               "mismatching export pointers: %p, %p\n",
1333                               lock, lock->l_exp_refs_target, exp);
1334         }
1335         if (-- lock->l_exp_refs_nr == 0) {
1336                 list_del_init(&lock->l_exp_refs_link);
1337                 lock->l_exp_refs_target = NULL;
1338         }
1339         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1340                lock, exp, lock->l_exp_refs_nr);
1341         spin_unlock(&exp->exp_locks_list_guard);
1342 }
1343 EXPORT_SYMBOL(__class_export_del_lock_ref);
1344 #endif
1345
1346 /* A connection defines an export context in which preallocation can
1347    be managed. This releases the export pointer reference, and returns
1348    the export handle, so the export refcount is 1 when this function
1349    returns. */
1350 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1351                   struct obd_uuid *cluuid)
1352 {
1353         struct obd_export *export;
1354         LASSERT(conn != NULL);
1355         LASSERT(obd != NULL);
1356         LASSERT(cluuid != NULL);
1357         ENTRY;
1358
1359         export = class_new_export(obd, cluuid);
1360         if (IS_ERR(export))
1361                 RETURN(PTR_ERR(export));
1362
1363         conn->cookie = export->exp_handle.h_cookie;
1364         class_export_put(export);
1365
1366         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1367                cluuid->uuid, conn->cookie);
1368         RETURN(0);
1369 }
1370 EXPORT_SYMBOL(class_connect);
1371
1372 /* if export is involved in recovery then clean up related things */
1373 static void class_export_recovery_cleanup(struct obd_export *exp)
1374 {
1375         struct obd_device *obd = exp->exp_obd;
1376
1377         spin_lock(&obd->obd_recovery_task_lock);
1378         if (obd->obd_recovering) {
1379                 if (exp->exp_in_recovery) {
1380                         spin_lock(&exp->exp_lock);
1381                         exp->exp_in_recovery = 0;
1382                         spin_unlock(&exp->exp_lock);
1383                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1384                         atomic_dec(&obd->obd_connected_clients);
1385                 }
1386
1387                 /* if called during recovery then should update
1388                  * obd_stale_clients counter,
1389                  * lightweight exports are not counted */
1390                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1391                         exp->exp_obd->obd_stale_clients++;
1392         }
1393         spin_unlock(&obd->obd_recovery_task_lock);
1394
1395         spin_lock(&exp->exp_lock);
1396         /** Cleanup req replay fields */
1397         if (exp->exp_req_replay_needed) {
1398                 exp->exp_req_replay_needed = 0;
1399
1400                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1401                 atomic_dec(&obd->obd_req_replay_clients);
1402         }
1403
1404         /** Cleanup lock replay data */
1405         if (exp->exp_lock_replay_needed) {
1406                 exp->exp_lock_replay_needed = 0;
1407
1408                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1409                 atomic_dec(&obd->obd_lock_replay_clients);
1410         }
1411         spin_unlock(&exp->exp_lock);
1412 }
1413
1414 /* This function removes 1-3 references from the export:
1415  * 1 - for export pointer passed
1416  * and if disconnect really need
1417  * 2 - removing from hash
1418  * 3 - in client_unlink_export
1419  * The export pointer passed to this function can destroyed */
1420 int class_disconnect(struct obd_export *export)
1421 {
1422         int already_disconnected;
1423         ENTRY;
1424
1425         if (export == NULL) {
1426                 CWARN("attempting to free NULL export %p\n", export);
1427                 RETURN(-EINVAL);
1428         }
1429
1430         spin_lock(&export->exp_lock);
1431         already_disconnected = export->exp_disconnected;
1432         export->exp_disconnected = 1;
1433         /*  We hold references of export for uuid hash
1434          *  and nid_hash and export link at least. So
1435          *  it is safe to call cfs_hash_del in there.  */
1436         if (!hlist_unhashed(&export->exp_nid_hash))
1437                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1438                              &export->exp_connection->c_peer.nid,
1439                              &export->exp_nid_hash);
1440         spin_unlock(&export->exp_lock);
1441
1442         /* class_cleanup(), abort_recovery(), and class_fail_export()
1443          * all end up in here, and if any of them race we shouldn't
1444          * call extra class_export_puts(). */
1445         if (already_disconnected) {
1446                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1447                 GOTO(no_disconn, already_disconnected);
1448         }
1449
1450         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1451                export->exp_handle.h_cookie);
1452
1453         class_export_recovery_cleanup(export);
1454         class_unlink_export(export);
1455 no_disconn:
1456         class_export_put(export);
1457         RETURN(0);
1458 }
1459 EXPORT_SYMBOL(class_disconnect);
1460
1461 /* Return non-zero for a fully connected export */
1462 int class_connected_export(struct obd_export *exp)
1463 {
1464         int connected = 0;
1465
1466         if (exp) {
1467                 spin_lock(&exp->exp_lock);
1468                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1469                 spin_unlock(&exp->exp_lock);
1470         }
1471         return connected;
1472 }
1473 EXPORT_SYMBOL(class_connected_export);
1474
1475 static void class_disconnect_export_list(struct list_head *list,
1476                                          enum obd_option flags)
1477 {
1478         int rc;
1479         struct obd_export *exp;
1480         ENTRY;
1481
1482         /* It's possible that an export may disconnect itself, but
1483          * nothing else will be added to this list. */
1484         while (!list_empty(list)) {
1485                 exp = list_entry(list->next, struct obd_export,
1486                                  exp_obd_chain);
1487                 /* need for safe call CDEBUG after obd_disconnect */
1488                 class_export_get(exp);
1489
1490                 spin_lock(&exp->exp_lock);
1491                 exp->exp_flags = flags;
1492                 spin_unlock(&exp->exp_lock);
1493
1494                 if (obd_uuid_equals(&exp->exp_client_uuid,
1495                                     &exp->exp_obd->obd_uuid)) {
1496                         CDEBUG(D_HA,
1497                                "exp %p export uuid == obd uuid, don't discon\n",
1498                                exp);
1499                         /* Need to delete this now so we don't end up pointing
1500                          * to work_list later when this export is cleaned up. */
1501                         list_del_init(&exp->exp_obd_chain);
1502                         class_export_put(exp);
1503                         continue;
1504                 }
1505
1506                 class_export_get(exp);
1507                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1508                        "last request at %lld\n",
1509                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1510                        exp, exp->exp_last_request_time);
1511                 /* release one export reference anyway */
1512                 rc = obd_disconnect(exp);
1513
1514                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1515                        obd_export_nid2str(exp), exp, rc);
1516                 class_export_put(exp);
1517         }
1518         EXIT;
1519 }
1520
1521 void class_disconnect_exports(struct obd_device *obd)
1522 {
1523         LIST_HEAD(work_list);
1524         ENTRY;
1525
1526         /* Move all of the exports from obd_exports to a work list, en masse. */
1527         spin_lock(&obd->obd_dev_lock);
1528         list_splice_init(&obd->obd_exports, &work_list);
1529         list_splice_init(&obd->obd_delayed_exports, &work_list);
1530         spin_unlock(&obd->obd_dev_lock);
1531
1532         if (!list_empty(&work_list)) {
1533                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1534                        "disconnecting them\n", obd->obd_minor, obd);
1535                 class_disconnect_export_list(&work_list,
1536                                              exp_flags_from_obd(obd));
1537         } else
1538                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1539                        obd->obd_minor, obd);
1540         EXIT;
1541 }
1542 EXPORT_SYMBOL(class_disconnect_exports);
1543
1544 /* Remove exports that have not completed recovery.
1545  */
1546 void class_disconnect_stale_exports(struct obd_device *obd,
1547                                     int (*test_export)(struct obd_export *))
1548 {
1549         LIST_HEAD(work_list);
1550         struct obd_export *exp, *n;
1551         int evicted = 0;
1552         ENTRY;
1553
1554         spin_lock(&obd->obd_dev_lock);
1555         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1556                                  exp_obd_chain) {
1557                 /* don't count self-export as client */
1558                 if (obd_uuid_equals(&exp->exp_client_uuid,
1559                                     &exp->exp_obd->obd_uuid))
1560                         continue;
1561
1562                 /* don't evict clients which have no slot in last_rcvd
1563                  * (e.g. lightweight connection) */
1564                 if (exp->exp_target_data.ted_lr_idx == -1)
1565                         continue;
1566
1567                 spin_lock(&exp->exp_lock);
1568                 if (exp->exp_failed || test_export(exp)) {
1569                         spin_unlock(&exp->exp_lock);
1570                         continue;
1571                 }
1572                 exp->exp_failed = 1;
1573                 spin_unlock(&exp->exp_lock);
1574
1575                 list_move(&exp->exp_obd_chain, &work_list);
1576                 evicted++;
1577                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1578                        obd->obd_name, exp->exp_client_uuid.uuid,
1579                        obd_export_nid2str(exp));
1580                 print_export_data(exp, "EVICTING", 0, D_HA);
1581         }
1582         spin_unlock(&obd->obd_dev_lock);
1583
1584         if (evicted)
1585                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1586                               obd->obd_name, evicted);
1587
1588         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1589                                                  OBD_OPT_ABORT_RECOV);
1590         EXIT;
1591 }
1592 EXPORT_SYMBOL(class_disconnect_stale_exports);
1593
1594 void class_fail_export(struct obd_export *exp)
1595 {
1596         int rc, already_failed;
1597
1598         spin_lock(&exp->exp_lock);
1599         already_failed = exp->exp_failed;
1600         exp->exp_failed = 1;
1601         spin_unlock(&exp->exp_lock);
1602
1603         if (already_failed) {
1604                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1605                        exp, exp->exp_client_uuid.uuid);
1606                 return;
1607         }
1608
1609         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1610                exp, exp->exp_client_uuid.uuid);
1611
1612         if (obd_dump_on_timeout)
1613                 libcfs_debug_dumplog();
1614
1615         /* need for safe call CDEBUG after obd_disconnect */
1616         class_export_get(exp);
1617
1618         /* Most callers into obd_disconnect are removing their own reference
1619          * (request, for example) in addition to the one from the hash table.
1620          * We don't have such a reference here, so make one. */
1621         class_export_get(exp);
1622         rc = obd_disconnect(exp);
1623         if (rc)
1624                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1625         else
1626                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1627                        exp, exp->exp_client_uuid.uuid);
1628         class_export_put(exp);
1629 }
1630 EXPORT_SYMBOL(class_fail_export);
1631
1632 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1633 {
1634         struct cfs_hash *nid_hash;
1635         struct obd_export *doomed_exp = NULL;
1636         int exports_evicted = 0;
1637
1638         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1639
1640         spin_lock(&obd->obd_dev_lock);
1641         /* umount has run already, so evict thread should leave
1642          * its task to umount thread now */
1643         if (obd->obd_stopping) {
1644                 spin_unlock(&obd->obd_dev_lock);
1645                 return exports_evicted;
1646         }
1647         nid_hash = obd->obd_nid_hash;
1648         cfs_hash_getref(nid_hash);
1649         spin_unlock(&obd->obd_dev_lock);
1650
1651         do {
1652                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1653                 if (doomed_exp == NULL)
1654                         break;
1655
1656                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1657                          "nid %s found, wanted nid %s, requested nid %s\n",
1658                          obd_export_nid2str(doomed_exp),
1659                          libcfs_nid2str(nid_key), nid);
1660                 LASSERTF(doomed_exp != obd->obd_self_export,
1661                          "self-export is hashed by NID?\n");
1662                 exports_evicted++;
1663                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1664                               "request\n", obd->obd_name,
1665                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1666                               obd_export_nid2str(doomed_exp));
1667                 class_fail_export(doomed_exp);
1668                 class_export_put(doomed_exp);
1669         } while (1);
1670
1671         cfs_hash_putref(nid_hash);
1672
1673         if (!exports_evicted)
1674                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1675                        obd->obd_name, nid);
1676         return exports_evicted;
1677 }
1678 EXPORT_SYMBOL(obd_export_evict_by_nid);
1679
1680 #ifdef HAVE_SERVER_SUPPORT
1681 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1682 {
1683         struct obd_export *doomed_exp = NULL;
1684         struct obd_uuid doomed_uuid;
1685         int exports_evicted = 0;
1686
1687         spin_lock(&obd->obd_dev_lock);
1688         if (obd->obd_stopping) {
1689                 spin_unlock(&obd->obd_dev_lock);
1690                 return exports_evicted;
1691         }
1692         spin_unlock(&obd->obd_dev_lock);
1693
1694         obd_str2uuid(&doomed_uuid, uuid);
1695         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1696                 CERROR("%s: can't evict myself\n", obd->obd_name);
1697                 return exports_evicted;
1698         }
1699
1700         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1701         if (doomed_exp == NULL) {
1702                 CERROR("%s: can't disconnect %s: no exports found\n",
1703                        obd->obd_name, uuid);
1704         } else {
1705                 CWARN("%s: evicting %s at adminstrative request\n",
1706                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1707                 class_fail_export(doomed_exp);
1708                 class_export_put(doomed_exp);
1709                 obd_uuid_del(obd, doomed_exp);
1710                 exports_evicted++;
1711         }
1712
1713         return exports_evicted;
1714 }
1715 #endif /* HAVE_SERVER_SUPPORT */
1716
1717 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1718 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1719 EXPORT_SYMBOL(class_export_dump_hook);
1720 #endif
1721
1722 static void print_export_data(struct obd_export *exp, const char *status,
1723                               int locks, int debug_level)
1724 {
1725         struct ptlrpc_reply_state *rs;
1726         struct ptlrpc_reply_state *first_reply = NULL;
1727         int nreplies = 0;
1728
1729         spin_lock(&exp->exp_lock);
1730         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1731                             rs_exp_list) {
1732                 if (nreplies == 0)
1733                         first_reply = rs;
1734                 nreplies++;
1735         }
1736         spin_unlock(&exp->exp_lock);
1737
1738         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1739                "%p %s %llu stale:%d\n",
1740                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1741                obd_export_nid2str(exp),
1742                refcount_read(&exp->exp_handle.h_ref),
1743                atomic_read(&exp->exp_rpc_count),
1744                atomic_read(&exp->exp_cb_count),
1745                atomic_read(&exp->exp_locks_count),
1746                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1747                nreplies, first_reply, nreplies > 3 ? "..." : "",
1748                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1749 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1750         if (locks && class_export_dump_hook != NULL)
1751                 class_export_dump_hook(exp);
1752 #endif
1753 }
1754
1755 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1756 {
1757         struct obd_export *exp;
1758
1759         spin_lock(&obd->obd_dev_lock);
1760         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1761                 print_export_data(exp, "ACTIVE", locks, debug_level);
1762         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1763                 print_export_data(exp, "UNLINKED", locks, debug_level);
1764         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1765                 print_export_data(exp, "DELAYED", locks, debug_level);
1766         spin_unlock(&obd->obd_dev_lock);
1767 }
1768
1769 void obd_exports_barrier(struct obd_device *obd)
1770 {
1771         int waited = 2;
1772         LASSERT(list_empty(&obd->obd_exports));
1773         spin_lock(&obd->obd_dev_lock);
1774         while (!list_empty(&obd->obd_unlinked_exports)) {
1775                 spin_unlock(&obd->obd_dev_lock);
1776                 set_current_state(TASK_UNINTERRUPTIBLE);
1777                 schedule_timeout(cfs_time_seconds(waited));
1778                 if (waited > 5 && is_power_of_2(waited)) {
1779                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1780                                       "more than %d seconds. "
1781                                       "The obd refcount = %d. Is it stuck?\n",
1782                                       obd->obd_name, waited,
1783                                       atomic_read(&obd->obd_refcount));
1784                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1785                 }
1786                 waited *= 2;
1787                 spin_lock(&obd->obd_dev_lock);
1788         }
1789         spin_unlock(&obd->obd_dev_lock);
1790 }
1791 EXPORT_SYMBOL(obd_exports_barrier);
1792
1793 /**
1794  * Add export to the obd_zombe thread and notify it.
1795  */
1796 static void obd_zombie_export_add(struct obd_export *exp) {
1797         atomic_dec(&obd_stale_export_num);
1798         spin_lock(&exp->exp_obd->obd_dev_lock);
1799         LASSERT(!list_empty(&exp->exp_obd_chain));
1800         list_del_init(&exp->exp_obd_chain);
1801         spin_unlock(&exp->exp_obd->obd_dev_lock);
1802
1803         queue_work(zombie_wq, &exp->exp_zombie_work);
1804 }
1805
1806 /**
1807  * Add import to the obd_zombe thread and notify it.
1808  */
1809 static void obd_zombie_import_add(struct obd_import *imp) {
1810         LASSERT(imp->imp_sec == NULL);
1811
1812         queue_work(zombie_wq, &imp->imp_zombie_work);
1813 }
1814
1815 /**
1816  * wait when obd_zombie import/export queues become empty
1817  */
1818 void obd_zombie_barrier(void)
1819 {
1820         flush_workqueue(zombie_wq);
1821 }
1822 EXPORT_SYMBOL(obd_zombie_barrier);
1823
1824
1825 struct obd_export *obd_stale_export_get(void)
1826 {
1827         struct obd_export *exp = NULL;
1828         ENTRY;
1829
1830         spin_lock(&obd_stale_export_lock);
1831         if (!list_empty(&obd_stale_exports)) {
1832                 exp = list_entry(obd_stale_exports.next,
1833                                  struct obd_export, exp_stale_list);
1834                 list_del_init(&exp->exp_stale_list);
1835         }
1836         spin_unlock(&obd_stale_export_lock);
1837
1838         if (exp) {
1839                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1840                        atomic_read(&obd_stale_export_num));
1841         }
1842         RETURN(exp);
1843 }
1844 EXPORT_SYMBOL(obd_stale_export_get);
1845
1846 void obd_stale_export_put(struct obd_export *exp)
1847 {
1848         ENTRY;
1849
1850         LASSERT(list_empty(&exp->exp_stale_list));
1851         if (exp->exp_lock_hash &&
1852             atomic_read(&exp->exp_lock_hash->hs_count)) {
1853                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1854                        atomic_read(&obd_stale_export_num));
1855
1856                 spin_lock_bh(&exp->exp_bl_list_lock);
1857                 spin_lock(&obd_stale_export_lock);
1858                 /* Add to the tail if there is no blocked locks,
1859                  * to the head otherwise. */
1860                 if (list_empty(&exp->exp_bl_list))
1861                         list_add_tail(&exp->exp_stale_list,
1862                                       &obd_stale_exports);
1863                 else
1864                         list_add(&exp->exp_stale_list,
1865                                  &obd_stale_exports);
1866
1867                 spin_unlock(&obd_stale_export_lock);
1868                 spin_unlock_bh(&exp->exp_bl_list_lock);
1869         } else {
1870                 class_export_put(exp);
1871         }
1872         EXIT;
1873 }
1874 EXPORT_SYMBOL(obd_stale_export_put);
1875
1876 /**
1877  * Adjust the position of the export in the stale list,
1878  * i.e. move to the head of the list if is needed.
1879  **/
1880 void obd_stale_export_adjust(struct obd_export *exp)
1881 {
1882         LASSERT(exp != NULL);
1883         spin_lock_bh(&exp->exp_bl_list_lock);
1884         spin_lock(&obd_stale_export_lock);
1885
1886         if (!list_empty(&exp->exp_stale_list) &&
1887             !list_empty(&exp->exp_bl_list))
1888                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1889
1890         spin_unlock(&obd_stale_export_lock);
1891         spin_unlock_bh(&exp->exp_bl_list_lock);
1892 }
1893 EXPORT_SYMBOL(obd_stale_export_adjust);
1894
1895 /**
1896  * start destroy zombie import/export thread
1897  */
1898 int obd_zombie_impexp_init(void)
1899 {
1900         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1901         if (!zombie_wq)
1902                 return -ENOMEM;
1903
1904         return 0;
1905 }
1906
1907 /**
1908  * stop destroy zombie import/export thread
1909  */
1910 void obd_zombie_impexp_stop(void)
1911 {
1912         destroy_workqueue(zombie_wq);
1913         LASSERT(list_empty(&obd_stale_exports));
1914 }
1915
1916 /***** Kernel-userspace comm helpers *******/
1917
1918 /* Get length of entire message, including header */
1919 int kuc_len(int payload_len)
1920 {
1921         return sizeof(struct kuc_hdr) + payload_len;
1922 }
1923 EXPORT_SYMBOL(kuc_len);
1924
1925 /* Get a pointer to kuc header, given a ptr to the payload
1926  * @param p Pointer to payload area
1927  * @returns Pointer to kuc header
1928  */
1929 struct kuc_hdr * kuc_ptr(void *p)
1930 {
1931         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1932         LASSERT(lh->kuc_magic == KUC_MAGIC);
1933         return lh;
1934 }
1935 EXPORT_SYMBOL(kuc_ptr);
1936
1937 /* Alloc space for a message, and fill in header
1938  * @return Pointer to payload area
1939  */
1940 void *kuc_alloc(int payload_len, int transport, int type)
1941 {
1942         struct kuc_hdr *lh;
1943         int len = kuc_len(payload_len);
1944
1945         OBD_ALLOC(lh, len);
1946         if (lh == NULL)
1947                 return ERR_PTR(-ENOMEM);
1948
1949         lh->kuc_magic = KUC_MAGIC;
1950         lh->kuc_transport = transport;
1951         lh->kuc_msgtype = type;
1952         lh->kuc_msglen = len;
1953
1954         return (void *)(lh + 1);
1955 }
1956 EXPORT_SYMBOL(kuc_alloc);
1957
1958 /* Takes pointer to payload area */
1959 void kuc_free(void *p, int payload_len)
1960 {
1961         struct kuc_hdr *lh = kuc_ptr(p);
1962         OBD_FREE(lh, kuc_len(payload_len));
1963 }
1964 EXPORT_SYMBOL(kuc_free);
1965
1966 struct obd_request_slot_waiter {
1967         struct list_head        orsw_entry;
1968         wait_queue_head_t       orsw_waitq;
1969         bool                    orsw_signaled;
1970 };
1971
1972 static bool obd_request_slot_avail(struct client_obd *cli,
1973                                    struct obd_request_slot_waiter *orsw)
1974 {
1975         bool avail;
1976
1977         spin_lock(&cli->cl_loi_list_lock);
1978         avail = !!list_empty(&orsw->orsw_entry);
1979         spin_unlock(&cli->cl_loi_list_lock);
1980
1981         return avail;
1982 };
1983
1984 /*
1985  * For network flow control, the RPC sponsor needs to acquire a credit
1986  * before sending the RPC. The credits count for a connection is defined
1987  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1988  * the subsequent RPC sponsors need to wait until others released their
1989  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1990  */
1991 int obd_get_request_slot(struct client_obd *cli)
1992 {
1993         struct obd_request_slot_waiter   orsw;
1994         struct l_wait_info               lwi;
1995         int                              rc;
1996
1997         spin_lock(&cli->cl_loi_list_lock);
1998         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1999                 cli->cl_rpcs_in_flight++;
2000                 spin_unlock(&cli->cl_loi_list_lock);
2001                 return 0;
2002         }
2003
2004         init_waitqueue_head(&orsw.orsw_waitq);
2005         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2006         orsw.orsw_signaled = false;
2007         spin_unlock(&cli->cl_loi_list_lock);
2008
2009         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2010         rc = l_wait_event(orsw.orsw_waitq,
2011                           obd_request_slot_avail(cli, &orsw) ||
2012                           orsw.orsw_signaled,
2013                           &lwi);
2014
2015         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2016          * freed but other (such as obd_put_request_slot) is using it. */
2017         spin_lock(&cli->cl_loi_list_lock);
2018         if (rc != 0) {
2019                 if (!orsw.orsw_signaled) {
2020                         if (list_empty(&orsw.orsw_entry))
2021                                 cli->cl_rpcs_in_flight--;
2022                         else
2023                                 list_del(&orsw.orsw_entry);
2024                 }
2025         }
2026
2027         if (orsw.orsw_signaled) {
2028                 LASSERT(list_empty(&orsw.orsw_entry));
2029
2030                 rc = -EINTR;
2031         }
2032         spin_unlock(&cli->cl_loi_list_lock);
2033
2034         return rc;
2035 }
2036 EXPORT_SYMBOL(obd_get_request_slot);
2037
2038 void obd_put_request_slot(struct client_obd *cli)
2039 {
2040         struct obd_request_slot_waiter *orsw;
2041
2042         spin_lock(&cli->cl_loi_list_lock);
2043         cli->cl_rpcs_in_flight--;
2044
2045         /* If there is free slot, wakeup the first waiter. */
2046         if (!list_empty(&cli->cl_flight_waiters) &&
2047             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2048                 orsw = list_entry(cli->cl_flight_waiters.next,
2049                                   struct obd_request_slot_waiter, orsw_entry);
2050                 list_del_init(&orsw->orsw_entry);
2051                 cli->cl_rpcs_in_flight++;
2052                 wake_up(&orsw->orsw_waitq);
2053         }
2054         spin_unlock(&cli->cl_loi_list_lock);
2055 }
2056 EXPORT_SYMBOL(obd_put_request_slot);
2057
2058 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2059 {
2060         return cli->cl_max_rpcs_in_flight;
2061 }
2062 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2063
2064 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2065 {
2066         struct obd_request_slot_waiter *orsw;
2067         __u32                           old;
2068         int                             diff;
2069         int                             i;
2070         const char *type_name;
2071         int                             rc;
2072
2073         if (max > OBD_MAX_RIF_MAX || max < 1)
2074                 return -ERANGE;
2075
2076         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2077         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2078                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2079                  * strictly lower that max_rpcs_in_flight */
2080                 if (max < 2) {
2081                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2082                                "because it must be higher than "
2083                                "max_mod_rpcs_in_flight value",
2084                                cli->cl_import->imp_obd->obd_name);
2085                         return -ERANGE;
2086                 }
2087                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2088                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2089                         if (rc != 0)
2090                                 return rc;
2091                 }
2092         }
2093
2094         spin_lock(&cli->cl_loi_list_lock);
2095         old = cli->cl_max_rpcs_in_flight;
2096         cli->cl_max_rpcs_in_flight = max;
2097         client_adjust_max_dirty(cli);
2098
2099         diff = max - old;
2100
2101         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2102         for (i = 0; i < diff; i++) {
2103                 if (list_empty(&cli->cl_flight_waiters))
2104                         break;
2105
2106                 orsw = list_entry(cli->cl_flight_waiters.next,
2107                                   struct obd_request_slot_waiter, orsw_entry);
2108                 list_del_init(&orsw->orsw_entry);
2109                 cli->cl_rpcs_in_flight++;
2110                 wake_up(&orsw->orsw_waitq);
2111         }
2112         spin_unlock(&cli->cl_loi_list_lock);
2113
2114         return 0;
2115 }
2116 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2117
2118 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2119 {
2120         return cli->cl_max_mod_rpcs_in_flight;
2121 }
2122 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2123
2124 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2125 {
2126         struct obd_connect_data *ocd;
2127         __u16 maxmodrpcs;
2128         __u16 prev;
2129
2130         if (max > OBD_MAX_RIF_MAX || max < 1)
2131                 return -ERANGE;
2132
2133         /* cannot exceed or equal max_rpcs_in_flight */
2134         if (max >= cli->cl_max_rpcs_in_flight) {
2135                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2136                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2137                        cli->cl_import->imp_obd->obd_name,
2138                        max, cli->cl_max_rpcs_in_flight);
2139                 return -ERANGE;
2140         }
2141
2142         /* cannot exceed max modify RPCs in flight supported by the server */
2143         ocd = &cli->cl_import->imp_connect_data;
2144         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2145                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2146         else
2147                 maxmodrpcs = 1;
2148         if (max > maxmodrpcs) {
2149                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2150                        "higher than max_mod_rpcs_per_client value (%hu) "
2151                        "returned by the server at connection\n",
2152                        cli->cl_import->imp_obd->obd_name,
2153                        max, maxmodrpcs);
2154                 return -ERANGE;
2155         }
2156
2157         spin_lock(&cli->cl_mod_rpcs_lock);
2158
2159         prev = cli->cl_max_mod_rpcs_in_flight;
2160         cli->cl_max_mod_rpcs_in_flight = max;
2161
2162         /* wakeup waiters if limit has been increased */
2163         if (cli->cl_max_mod_rpcs_in_flight > prev)
2164                 wake_up(&cli->cl_mod_rpcs_waitq);
2165
2166         spin_unlock(&cli->cl_mod_rpcs_lock);
2167
2168         return 0;
2169 }
2170 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2171
2172 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2173                                struct seq_file *seq)
2174 {
2175         unsigned long mod_tot = 0, mod_cum;
2176         struct timespec64 now;
2177         int i;
2178
2179         ktime_get_real_ts64(&now);
2180
2181         spin_lock(&cli->cl_mod_rpcs_lock);
2182
2183         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2184                    (s64)now.tv_sec, now.tv_nsec);
2185         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2186                    cli->cl_mod_rpcs_in_flight);
2187
2188         seq_printf(seq, "\n\t\t\tmodify\n");
2189         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2190
2191         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2192
2193         mod_cum = 0;
2194         for (i = 0; i < OBD_HIST_MAX; i++) {
2195                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2196                 mod_cum += mod;
2197                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2198                            i, mod, pct(mod, mod_tot),
2199                            pct(mod_cum, mod_tot));
2200                 if (mod_cum == mod_tot)
2201                         break;
2202         }
2203
2204         spin_unlock(&cli->cl_mod_rpcs_lock);
2205
2206         return 0;
2207 }
2208 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2209
2210 /* The number of modify RPCs sent in parallel is limited
2211  * because the server has a finite number of slots per client to
2212  * store request result and ensure reply reconstruction when needed.
2213  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2214  * that takes into account server limit and cl_max_rpcs_in_flight
2215  * value.
2216  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2217  * one close request is allowed above the maximum.
2218  */
2219 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2220                                                  bool close_req)
2221 {
2222         bool avail;
2223
2224         /* A slot is available if
2225          * - number of modify RPCs in flight is less than the max
2226          * - it's a close RPC and no other close request is in flight
2227          */
2228         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2229                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2230
2231         return avail;
2232 }
2233
2234 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2235                                          bool close_req)
2236 {
2237         bool avail;
2238
2239         spin_lock(&cli->cl_mod_rpcs_lock);
2240         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2241         spin_unlock(&cli->cl_mod_rpcs_lock);
2242         return avail;
2243 }
2244
2245
2246 /* Get a modify RPC slot from the obd client @cli according
2247  * to the kind of operation @opc that is going to be sent
2248  * and the intent @it of the operation if it applies.
2249  * If the maximum number of modify RPCs in flight is reached
2250  * the thread is put to sleep.
2251  * Returns the tag to be set in the request message. Tag 0
2252  * is reserved for non-modifying requests.
2253  */
2254 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2255 {
2256         bool                    close_req = false;
2257         __u16                   i, max;
2258
2259         if (opc == MDS_CLOSE)
2260                 close_req = true;
2261
2262         do {
2263                 spin_lock(&cli->cl_mod_rpcs_lock);
2264                 max = cli->cl_max_mod_rpcs_in_flight;
2265                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2266                         /* there is a slot available */
2267                         cli->cl_mod_rpcs_in_flight++;
2268                         if (close_req)
2269                                 cli->cl_close_rpcs_in_flight++;
2270                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2271                                          cli->cl_mod_rpcs_in_flight);
2272                         /* find a free tag */
2273                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2274                                                 max + 1);
2275                         LASSERT(i < OBD_MAX_RIF_MAX);
2276                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2277                         spin_unlock(&cli->cl_mod_rpcs_lock);
2278                         /* tag 0 is reserved for non-modify RPCs */
2279
2280                         CDEBUG(D_RPCTRACE,
2281                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2282                                cli->cl_import->imp_obd->obd_name,
2283                                i + 1, opc, max);
2284
2285                         return i + 1;
2286                 }
2287                 spin_unlock(&cli->cl_mod_rpcs_lock);
2288
2289                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2290                        "opc %u, max %hu\n",
2291                        cli->cl_import->imp_obd->obd_name, opc, max);
2292
2293                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2294                                           obd_mod_rpc_slot_avail(cli,
2295                                                                  close_req));
2296         } while (true);
2297 }
2298 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2299
2300 /* Put a modify RPC slot from the obd client @cli according
2301  * to the kind of operation @opc that has been sent.
2302  */
2303 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2304 {
2305         bool                    close_req = false;
2306
2307         if (tag == 0)
2308                 return;
2309
2310         if (opc == MDS_CLOSE)
2311                 close_req = true;
2312
2313         spin_lock(&cli->cl_mod_rpcs_lock);
2314         cli->cl_mod_rpcs_in_flight--;
2315         if (close_req)
2316                 cli->cl_close_rpcs_in_flight--;
2317         /* release the tag in the bitmap */
2318         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2319         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2320         spin_unlock(&cli->cl_mod_rpcs_lock);
2321         wake_up(&cli->cl_mod_rpcs_waitq);
2322 }
2323 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2324