Whamcloud - gitweb
LU-12542 handle: rename ops to owner
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         if (IS_ERR_OR_NULL(symlink)) {
208                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209                 kobject_put(&type->typ_kobj);
210                 return ERR_PTR(rc);
211         }
212         type->typ_debugfs_entry = symlink;
213         type->typ_sym_filter = true;
214
215         if (enable_proc) {
216                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
217                                                       NULL, NULL);
218                 if (IS_ERR(type->typ_procroot)) {
219                         CERROR("%s: can't create compat proc entry: %d\n",
220                                name, (int)PTR_ERR(type->typ_procroot));
221                         type->typ_procroot = NULL;
222                 }
223         }
224
225         return type;
226 }
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
229
230 #define CLASS_MAX_NAME 1024
231
232 int class_register_type(const struct obd_ops *dt_ops,
233                         const struct md_ops *md_ops,
234                         bool enable_proc, struct lprocfs_vars *vars,
235                         const char *name, struct lu_device_type *ldt)
236 {
237         struct obd_type *type;
238         int rc;
239
240         ENTRY;
241         /* sanity check */
242         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
243
244         type = class_search_type(name);
245         if (type) {
246 #ifdef HAVE_SERVER_SUPPORT
247                 if (type->typ_sym_filter)
248                         goto dir_exist;
249 #endif /* HAVE_SERVER_SUPPORT */
250                 kobject_put(&type->typ_kobj);
251                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
252                 RETURN(-EEXIST);
253         }
254
255         OBD_ALLOC(type, sizeof(*type));
256         if (type == NULL)
257                 RETURN(-ENOMEM);
258
259         type->typ_kobj.kset = lustre_kset;
260         kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
262 dir_exist:
263 #endif /* HAVE_SERVER_SUPPORT */
264
265         type->typ_dt_ops = dt_ops;
266         type->typ_md_ops = md_ops;
267
268 #ifdef HAVE_SERVER_SUPPORT
269         if (type->typ_sym_filter) {
270                 type->typ_sym_filter = false;
271                 kobject_put(&type->typ_kobj);
272                 goto setup_ldt;
273         }
274 #endif
275 #ifdef CONFIG_PROC_FS
276         if (enable_proc && !type->typ_procroot) {
277                 type->typ_procroot = lprocfs_register(name,
278                                                       proc_lustre_root,
279                                                       NULL, type);
280                 if (IS_ERR(type->typ_procroot)) {
281                         rc = PTR_ERR(type->typ_procroot);
282                         type->typ_procroot = NULL;
283                         GOTO(failed, rc);
284                 }
285         }
286 #endif
287         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
288                                                     vars, type);
289         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
290                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
291                                              : -ENOMEM;
292                 type->typ_debugfs_entry = NULL;
293                 GOTO(failed, rc);
294         }
295
296         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
297         if (rc)
298                 GOTO(failed, rc);
299 #ifdef HAVE_SERVER_SUPPORT
300 setup_ldt:
301 #endif
302         if (ldt) {
303                 type->typ_lu = ldt;
304                 rc = lu_device_type_init(ldt);
305                 if (rc)
306                         GOTO(failed, rc);
307         }
308
309         RETURN(0);
310
311 failed:
312         kobject_put(&type->typ_kobj);
313
314         RETURN(rc);
315 }
316 EXPORT_SYMBOL(class_register_type);
317
318 int class_unregister_type(const char *name)
319 {
320         struct obd_type *type = class_search_type(name);
321         int rc = 0;
322         ENTRY;
323
324         if (!type) {
325                 CERROR("unknown obd type\n");
326                 RETURN(-EINVAL);
327         }
328
329         if (atomic_read(&type->typ_refcnt)) {
330                 CERROR("type %s has refcount (%d)\n", name,
331                        atomic_read(&type->typ_refcnt));
332                 /* This is a bad situation, let's make the best of it */
333                 /* Remove ops, but leave the name for debugging */
334                 type->typ_dt_ops = NULL;
335                 type->typ_md_ops = NULL;
336                 GOTO(out_put, rc = -EBUSY);
337         }
338
339         /* Put the final ref */
340         kobject_put(&type->typ_kobj);
341 out_put:
342         /* Put the ref returned by class_search_type() */
343         kobject_put(&type->typ_kobj);
344
345         RETURN(rc);
346 } /* class_unregister_type */
347 EXPORT_SYMBOL(class_unregister_type);
348
349 /**
350  * Create a new obd device.
351  *
352  * Allocate the new obd_device and initialize it.
353  *
354  * \param[in] type_name obd device type string.
355  * \param[in] name      obd device name.
356  * \param[in] uuid      obd device UUID
357  *
358  * \retval newdev         pointer to created obd_device
359  * \retval ERR_PTR(errno) on error
360  */
361 struct obd_device *class_newdev(const char *type_name, const char *name,
362                                 const char *uuid)
363 {
364         struct obd_device *newdev;
365         struct obd_type *type = NULL;
366         ENTRY;
367
368         if (strlen(name) >= MAX_OBD_NAME) {
369                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
370                 RETURN(ERR_PTR(-EINVAL));
371         }
372
373         type = class_get_type(type_name);
374         if (type == NULL){
375                 CERROR("OBD: unknown type: %s\n", type_name);
376                 RETURN(ERR_PTR(-ENODEV));
377         }
378
379         newdev = obd_device_alloc();
380         if (newdev == NULL) {
381                 class_put_type(type);
382                 RETURN(ERR_PTR(-ENOMEM));
383         }
384         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
385         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
386         newdev->obd_type = type;
387         newdev->obd_minor = -1;
388
389         rwlock_init(&newdev->obd_pool_lock);
390         newdev->obd_pool_limit = 0;
391         newdev->obd_pool_slv = 0;
392
393         INIT_LIST_HEAD(&newdev->obd_exports);
394         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
395         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
396         INIT_LIST_HEAD(&newdev->obd_exports_timed);
397         INIT_LIST_HEAD(&newdev->obd_nid_stats);
398         spin_lock_init(&newdev->obd_nid_lock);
399         spin_lock_init(&newdev->obd_dev_lock);
400         mutex_init(&newdev->obd_dev_mutex);
401         spin_lock_init(&newdev->obd_osfs_lock);
402         /* newdev->obd_osfs_age must be set to a value in the distant
403          * past to guarantee a fresh statfs is fetched on mount. */
404         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
405
406         /* XXX belongs in setup not attach  */
407         init_rwsem(&newdev->obd_observer_link_sem);
408         /* recovery data */
409         spin_lock_init(&newdev->obd_recovery_task_lock);
410         init_waitqueue_head(&newdev->obd_next_transno_waitq);
411         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
412         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
413         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
414         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
415         INIT_LIST_HEAD(&newdev->obd_evict_list);
416         INIT_LIST_HEAD(&newdev->obd_lwp_list);
417
418         llog_group_init(&newdev->obd_olg);
419         /* Detach drops this */
420         atomic_set(&newdev->obd_refcount, 1);
421         lu_ref_init(&newdev->obd_reference);
422         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
423
424         newdev->obd_conn_inprogress = 0;
425
426         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
427
428         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
429                newdev->obd_name, newdev);
430
431         return newdev;
432 }
433
434 /**
435  * Free obd device.
436  *
437  * \param[in] obd obd_device to be freed
438  *
439  * \retval none
440  */
441 void class_free_dev(struct obd_device *obd)
442 {
443         struct obd_type *obd_type = obd->obd_type;
444
445         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
446                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
448                  "obd %p != obd_devs[%d] %p\n",
449                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
450         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
451                  "obd_refcount should be 0, not %d\n",
452                  atomic_read(&obd->obd_refcount));
453         LASSERT(obd_type != NULL);
454
455         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
456                obd->obd_name, obd->obd_type->typ_name);
457
458         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
459                          obd->obd_name, obd->obd_uuid.uuid);
460         if (obd->obd_stopping) {
461                 int err;
462
463                 /* If we're not stopping, we were never set up */
464                 err = obd_cleanup(obd);
465                 if (err)
466                         CERROR("Cleanup %s returned %d\n",
467                                 obd->obd_name, err);
468         }
469
470         obd_device_free(obd);
471
472         class_put_type(obd_type);
473 }
474
475 /**
476  * Unregister obd device.
477  *
478  * Free slot in obd_dev[] used by \a obd.
479  *
480  * \param[in] new_obd obd_device to be unregistered
481  *
482  * \retval none
483  */
484 void class_unregister_device(struct obd_device *obd)
485 {
486         write_lock(&obd_dev_lock);
487         if (obd->obd_minor >= 0) {
488                 LASSERT(obd_devs[obd->obd_minor] == obd);
489                 obd_devs[obd->obd_minor] = NULL;
490                 obd->obd_minor = -1;
491         }
492         write_unlock(&obd_dev_lock);
493 }
494
495 /**
496  * Register obd device.
497  *
498  * Find free slot in obd_devs[], fills it with \a new_obd.
499  *
500  * \param[in] new_obd obd_device to be registered
501  *
502  * \retval 0          success
503  * \retval -EEXIST    device with this name is registered
504  * \retval -EOVERFLOW obd_devs[] is full
505  */
506 int class_register_device(struct obd_device *new_obd)
507 {
508         int ret = 0;
509         int i;
510         int new_obd_minor = 0;
511         bool minor_assign = false;
512         bool retried = false;
513
514 again:
515         write_lock(&obd_dev_lock);
516         for (i = 0; i < class_devno_max(); i++) {
517                 struct obd_device *obd = class_num2obd(i);
518
519                 if (obd != NULL &&
520                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
521
522                         if (!retried) {
523                                 write_unlock(&obd_dev_lock);
524
525                                 /* the obd_device could be waited to be
526                                  * destroyed by the "obd_zombie_impexp_thread".
527                                  */
528                                 obd_zombie_barrier();
529                                 retried = true;
530                                 goto again;
531                         }
532
533                         CERROR("%s: already exists, won't add\n",
534                                obd->obd_name);
535                         /* in case we found a free slot before duplicate */
536                         minor_assign = false;
537                         ret = -EEXIST;
538                         break;
539                 }
540                 if (!minor_assign && obd == NULL) {
541                         new_obd_minor = i;
542                         minor_assign = true;
543                 }
544         }
545
546         if (minor_assign) {
547                 new_obd->obd_minor = new_obd_minor;
548                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
549                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
550                 obd_devs[new_obd_minor] = new_obd;
551         } else {
552                 if (ret == 0) {
553                         ret = -EOVERFLOW;
554                         CERROR("%s: all %u/%u devices used, increase "
555                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
556                                i, class_devno_max(), ret);
557                 }
558         }
559         write_unlock(&obd_dev_lock);
560
561         RETURN(ret);
562 }
563
564 static int class_name2dev_nolock(const char *name)
565 {
566         int i;
567
568         if (!name)
569                 return -1;
570
571         for (i = 0; i < class_devno_max(); i++) {
572                 struct obd_device *obd = class_num2obd(i);
573
574                 if (obd && strcmp(name, obd->obd_name) == 0) {
575                         /* Make sure we finished attaching before we give
576                            out any references */
577                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
578                         if (obd->obd_attached) {
579                                 return i;
580                         }
581                         break;
582                 }
583         }
584
585         return -1;
586 }
587
588 int class_name2dev(const char *name)
589 {
590         int i;
591
592         if (!name)
593                 return -1;
594
595         read_lock(&obd_dev_lock);
596         i = class_name2dev_nolock(name);
597         read_unlock(&obd_dev_lock);
598
599         return i;
600 }
601 EXPORT_SYMBOL(class_name2dev);
602
603 struct obd_device *class_name2obd(const char *name)
604 {
605         int dev = class_name2dev(name);
606
607         if (dev < 0 || dev > class_devno_max())
608                 return NULL;
609         return class_num2obd(dev);
610 }
611 EXPORT_SYMBOL(class_name2obd);
612
613 int class_uuid2dev_nolock(struct obd_uuid *uuid)
614 {
615         int i;
616
617         for (i = 0; i < class_devno_max(); i++) {
618                 struct obd_device *obd = class_num2obd(i);
619
620                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
621                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
622                         return i;
623                 }
624         }
625
626         return -1;
627 }
628
629 int class_uuid2dev(struct obd_uuid *uuid)
630 {
631         int i;
632
633         read_lock(&obd_dev_lock);
634         i = class_uuid2dev_nolock(uuid);
635         read_unlock(&obd_dev_lock);
636
637         return i;
638 }
639 EXPORT_SYMBOL(class_uuid2dev);
640
641 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
642 {
643         int dev = class_uuid2dev(uuid);
644         if (dev < 0)
645                 return NULL;
646         return class_num2obd(dev);
647 }
648 EXPORT_SYMBOL(class_uuid2obd);
649
650 /**
651  * Get obd device from ::obd_devs[]
652  *
653  * \param num [in] array index
654  *
655  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
656  *         otherwise return the obd device there.
657  */
658 struct obd_device *class_num2obd(int num)
659 {
660         struct obd_device *obd = NULL;
661
662         if (num < class_devno_max()) {
663                 obd = obd_devs[num];
664                 if (obd == NULL)
665                         return NULL;
666
667                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
668                          "%p obd_magic %08x != %08x\n",
669                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
670                 LASSERTF(obd->obd_minor == num,
671                          "%p obd_minor %0d != %0d\n",
672                          obd, obd->obd_minor, num);
673         }
674
675         return obd;
676 }
677
678 /**
679  * Find obd in obd_dev[] by name or uuid.
680  *
681  * Increment obd's refcount if found.
682  *
683  * \param[in] str obd name or uuid
684  *
685  * \retval NULL    if not found
686  * \retval target  pointer to found obd_device
687  */
688 struct obd_device *class_dev_by_str(const char *str)
689 {
690         struct obd_device *target = NULL;
691         struct obd_uuid tgtuuid;
692         int rc;
693
694         obd_str2uuid(&tgtuuid, str);
695
696         read_lock(&obd_dev_lock);
697         rc = class_uuid2dev_nolock(&tgtuuid);
698         if (rc < 0)
699                 rc = class_name2dev_nolock(str);
700
701         if (rc >= 0)
702                 target = class_num2obd(rc);
703
704         if (target != NULL)
705                 class_incref(target, "find", current);
706         read_unlock(&obd_dev_lock);
707
708         RETURN(target);
709 }
710 EXPORT_SYMBOL(class_dev_by_str);
711
712 /**
713  * Get obd devices count. Device in any
714  *    state are counted
715  * \retval obd device count
716  */
717 int get_devices_count(void)
718 {
719         int index, max_index = class_devno_max(), dev_count = 0;
720
721         read_lock(&obd_dev_lock);
722         for (index = 0; index <= max_index; index++) {
723                 struct obd_device *obd = class_num2obd(index);
724                 if (obd != NULL)
725                         dev_count++;
726         }
727         read_unlock(&obd_dev_lock);
728
729         return dev_count;
730 }
731 EXPORT_SYMBOL(get_devices_count);
732
733 void class_obd_list(void)
734 {
735         char *status;
736         int i;
737
738         read_lock(&obd_dev_lock);
739         for (i = 0; i < class_devno_max(); i++) {
740                 struct obd_device *obd = class_num2obd(i);
741
742                 if (obd == NULL)
743                         continue;
744                 if (obd->obd_stopping)
745                         status = "ST";
746                 else if (obd->obd_set_up)
747                         status = "UP";
748                 else if (obd->obd_attached)
749                         status = "AT";
750                 else
751                         status = "--";
752                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
753                          i, status, obd->obd_type->typ_name,
754                          obd->obd_name, obd->obd_uuid.uuid,
755                          atomic_read(&obd->obd_refcount));
756         }
757         read_unlock(&obd_dev_lock);
758 }
759
760 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
761    specified, then only the client with that uuid is returned,
762    otherwise any client connected to the tgt is returned. */
763 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
764                                           const char *type_name,
765                                           struct obd_uuid *grp_uuid)
766 {
767         int i;
768
769         read_lock(&obd_dev_lock);
770         for (i = 0; i < class_devno_max(); i++) {
771                 struct obd_device *obd = class_num2obd(i);
772
773                 if (obd == NULL)
774                         continue;
775                 if ((strncmp(obd->obd_type->typ_name, type_name,
776                              strlen(type_name)) == 0)) {
777                         if (obd_uuid_equals(tgt_uuid,
778                                             &obd->u.cli.cl_target_uuid) &&
779                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
780                                                          &obd->obd_uuid) : 1)) {
781                                 read_unlock(&obd_dev_lock);
782                                 return obd;
783                         }
784                 }
785         }
786         read_unlock(&obd_dev_lock);
787
788         return NULL;
789 }
790 EXPORT_SYMBOL(class_find_client_obd);
791
792 /* Iterate the obd_device list looking devices have grp_uuid. Start
793    searching at *next, and if a device is found, the next index to look
794    at is saved in *next. If next is NULL, then the first matching device
795    will always be returned. */
796 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
797 {
798         int i;
799
800         if (next == NULL)
801                 i = 0;
802         else if (*next >= 0 && *next < class_devno_max())
803                 i = *next;
804         else
805                 return NULL;
806
807         read_lock(&obd_dev_lock);
808         for (; i < class_devno_max(); i++) {
809                 struct obd_device *obd = class_num2obd(i);
810
811                 if (obd == NULL)
812                         continue;
813                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
814                         if (next != NULL)
815                                 *next = i+1;
816                         read_unlock(&obd_dev_lock);
817                         return obd;
818                 }
819         }
820         read_unlock(&obd_dev_lock);
821
822         return NULL;
823 }
824 EXPORT_SYMBOL(class_devices_in_group);
825
826 /**
827  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
828  * adjust sptlrpc settings accordingly.
829  */
830 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
831 {
832         struct obd_device  *obd;
833         const char         *type;
834         int                 i, rc = 0, rc2;
835
836         LASSERT(namelen > 0);
837
838         read_lock(&obd_dev_lock);
839         for (i = 0; i < class_devno_max(); i++) {
840                 obd = class_num2obd(i);
841
842                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
843                         continue;
844
845                 /* only notify mdc, osc, osp, lwp, mdt, ost
846                  * because only these have a -sptlrpc llog */
847                 type = obd->obd_type->typ_name;
848                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
849                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
850                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
851                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
852                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
853                     strcmp(type, LUSTRE_OST_NAME) != 0)
854                         continue;
855
856                 if (strncmp(obd->obd_name, fsname, namelen))
857                         continue;
858
859                 class_incref(obd, __FUNCTION__, obd);
860                 read_unlock(&obd_dev_lock);
861                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
862                                          sizeof(KEY_SPTLRPC_CONF),
863                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
864                 rc = rc ? rc : rc2;
865                 class_decref(obd, __FUNCTION__, obd);
866                 read_lock(&obd_dev_lock);
867         }
868         read_unlock(&obd_dev_lock);
869         return rc;
870 }
871 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
872
873 void obd_cleanup_caches(void)
874 {
875         ENTRY;
876         if (obd_device_cachep) {
877                 kmem_cache_destroy(obd_device_cachep);
878                 obd_device_cachep = NULL;
879         }
880
881         EXIT;
882 }
883
884 int obd_init_caches(void)
885 {
886         int rc;
887         ENTRY;
888
889         LASSERT(obd_device_cachep == NULL);
890         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
891                                 sizeof(struct obd_device),
892                                 0, 0, 0, sizeof(struct obd_device), NULL);
893         if (!obd_device_cachep)
894                 GOTO(out, rc = -ENOMEM);
895
896         RETURN(0);
897 out:
898         obd_cleanup_caches();
899         RETURN(rc);
900 }
901
902 static const char export_handle_owner[] = "export";
903
904 /* map connection to client */
905 struct obd_export *class_conn2export(struct lustre_handle *conn)
906 {
907         struct obd_export *export;
908         ENTRY;
909
910         if (!conn) {
911                 CDEBUG(D_CACHE, "looking for null handle\n");
912                 RETURN(NULL);
913         }
914
915         if (conn->cookie == -1) {  /* this means assign a new connection */
916                 CDEBUG(D_CACHE, "want a new connection\n");
917                 RETURN(NULL);
918         }
919
920         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
921         export = class_handle2object(conn->cookie, export_handle_owner);
922         RETURN(export);
923 }
924 EXPORT_SYMBOL(class_conn2export);
925
926 struct obd_device *class_exp2obd(struct obd_export *exp)
927 {
928         if (exp)
929                 return exp->exp_obd;
930         return NULL;
931 }
932 EXPORT_SYMBOL(class_exp2obd);
933
934 struct obd_import *class_exp2cliimp(struct obd_export *exp)
935 {
936         struct obd_device *obd = exp->exp_obd;
937         if (obd == NULL)
938                 return NULL;
939         return obd->u.cli.cl_import;
940 }
941 EXPORT_SYMBOL(class_exp2cliimp);
942
943 /* Export management functions */
944 static void class_export_destroy(struct obd_export *exp)
945 {
946         struct obd_device *obd = exp->exp_obd;
947         ENTRY;
948
949         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
950         LASSERT(obd != NULL);
951
952         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
953                exp->exp_client_uuid.uuid, obd->obd_name);
954
955         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
956         if (exp->exp_connection)
957                 ptlrpc_put_connection_superhack(exp->exp_connection);
958
959         LASSERT(list_empty(&exp->exp_outstanding_replies));
960         LASSERT(list_empty(&exp->exp_uncommitted_replies));
961         LASSERT(list_empty(&exp->exp_req_replay_queue));
962         LASSERT(list_empty(&exp->exp_hp_rpcs));
963         obd_destroy_export(exp);
964         /* self export doesn't hold a reference to an obd, although it
965          * exists until freeing of the obd */
966         if (exp != obd->obd_self_export)
967                 class_decref(obd, "export", exp);
968
969         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
970         kfree_rcu(exp, exp_handle.h_rcu);
971         EXIT;
972 }
973
974 struct obd_export *class_export_get(struct obd_export *exp)
975 {
976         refcount_inc(&exp->exp_handle.h_ref);
977         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
978                refcount_read(&exp->exp_handle.h_ref));
979         return exp;
980 }
981 EXPORT_SYMBOL(class_export_get);
982
983 void class_export_put(struct obd_export *exp)
984 {
985         LASSERT(exp != NULL);
986         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
987         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
988         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
989                refcount_read(&exp->exp_handle.h_ref) - 1);
990
991         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
992                 struct obd_device *obd = exp->exp_obd;
993
994                 CDEBUG(D_IOCTL, "final put %p/%s\n",
995                        exp, exp->exp_client_uuid.uuid);
996
997                 /* release nid stat refererence */
998                 lprocfs_exp_cleanup(exp);
999
1000                 if (exp == obd->obd_self_export) {
1001                         /* self export should be destroyed without
1002                          * zombie thread as it doesn't hold a
1003                          * reference to obd and doesn't hold any
1004                          * resources */
1005                         class_export_destroy(exp);
1006                         /* self export is destroyed, no class
1007                          * references exist and it is safe to free
1008                          * obd */
1009                         class_free_dev(obd);
1010                 } else {
1011                         LASSERT(!list_empty(&exp->exp_obd_chain));
1012                         obd_zombie_export_add(exp);
1013                 }
1014
1015         }
1016 }
1017 EXPORT_SYMBOL(class_export_put);
1018
1019 static void obd_zombie_exp_cull(struct work_struct *ws)
1020 {
1021         struct obd_export *export;
1022
1023         export = container_of(ws, struct obd_export, exp_zombie_work);
1024         class_export_destroy(export);
1025 }
1026
1027 /* Creates a new export, adds it to the hash table, and returns a
1028  * pointer to it. The refcount is 2: one for the hash reference, and
1029  * one for the pointer returned by this function. */
1030 struct obd_export *__class_new_export(struct obd_device *obd,
1031                                       struct obd_uuid *cluuid, bool is_self)
1032 {
1033         struct obd_export *export;
1034         int rc = 0;
1035         ENTRY;
1036
1037         OBD_ALLOC_PTR(export);
1038         if (!export)
1039                 return ERR_PTR(-ENOMEM);
1040
1041         export->exp_conn_cnt = 0;
1042         export->exp_lock_hash = NULL;
1043         export->exp_flock_hash = NULL;
1044         /* 2 = class_handle_hash + last */
1045         refcount_set(&export->exp_handle.h_ref, 2);
1046         atomic_set(&export->exp_rpc_count, 0);
1047         atomic_set(&export->exp_cb_count, 0);
1048         atomic_set(&export->exp_locks_count, 0);
1049 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1050         INIT_LIST_HEAD(&export->exp_locks_list);
1051         spin_lock_init(&export->exp_locks_list_guard);
1052 #endif
1053         atomic_set(&export->exp_replay_count, 0);
1054         export->exp_obd = obd;
1055         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1056         spin_lock_init(&export->exp_uncommitted_replies_lock);
1057         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1058         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1059         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1060         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1061         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1062         class_handle_hash(&export->exp_handle, export_handle_owner);
1063         export->exp_last_request_time = ktime_get_real_seconds();
1064         spin_lock_init(&export->exp_lock);
1065         spin_lock_init(&export->exp_rpc_lock);
1066         INIT_HLIST_NODE(&export->exp_nid_hash);
1067         INIT_HLIST_NODE(&export->exp_gen_hash);
1068         spin_lock_init(&export->exp_bl_list_lock);
1069         INIT_LIST_HEAD(&export->exp_bl_list);
1070         INIT_LIST_HEAD(&export->exp_stale_list);
1071         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1072
1073         export->exp_sp_peer = LUSTRE_SP_ANY;
1074         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1075         export->exp_client_uuid = *cluuid;
1076         obd_init_export(export);
1077
1078         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1079
1080         spin_lock(&obd->obd_dev_lock);
1081         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1082                 /* shouldn't happen, but might race */
1083                 if (obd->obd_stopping)
1084                         GOTO(exit_unlock, rc = -ENODEV);
1085
1086                 rc = obd_uuid_add(obd, export);
1087                 if (rc != 0) {
1088                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1089                                       obd->obd_name, cluuid->uuid, rc);
1090                         GOTO(exit_unlock, rc = -EALREADY);
1091                 }
1092         }
1093
1094         if (!is_self) {
1095                 class_incref(obd, "export", export);
1096                 list_add_tail(&export->exp_obd_chain_timed,
1097                               &obd->obd_exports_timed);
1098                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1099                 obd->obd_num_exports++;
1100         } else {
1101                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1102                 INIT_LIST_HEAD(&export->exp_obd_chain);
1103         }
1104         spin_unlock(&obd->obd_dev_lock);
1105         RETURN(export);
1106
1107 exit_unlock:
1108         spin_unlock(&obd->obd_dev_lock);
1109         class_handle_unhash(&export->exp_handle);
1110         obd_destroy_export(export);
1111         OBD_FREE_PTR(export);
1112         return ERR_PTR(rc);
1113 }
1114
1115 struct obd_export *class_new_export(struct obd_device *obd,
1116                                     struct obd_uuid *uuid)
1117 {
1118         return __class_new_export(obd, uuid, false);
1119 }
1120 EXPORT_SYMBOL(class_new_export);
1121
1122 struct obd_export *class_new_export_self(struct obd_device *obd,
1123                                          struct obd_uuid *uuid)
1124 {
1125         return __class_new_export(obd, uuid, true);
1126 }
1127
1128 void class_unlink_export(struct obd_export *exp)
1129 {
1130         class_handle_unhash(&exp->exp_handle);
1131
1132         if (exp->exp_obd->obd_self_export == exp) {
1133                 class_export_put(exp);
1134                 return;
1135         }
1136
1137         spin_lock(&exp->exp_obd->obd_dev_lock);
1138         /* delete an uuid-export hashitem from hashtables */
1139         if (exp != exp->exp_obd->obd_self_export)
1140                 obd_uuid_del(exp->exp_obd, exp);
1141
1142 #ifdef HAVE_SERVER_SUPPORT
1143         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1144                 struct tg_export_data   *ted = &exp->exp_target_data;
1145                 struct cfs_hash         *hash;
1146
1147                 /* Because obd_gen_hash will not be released until
1148                  * class_cleanup(), so hash should never be NULL here */
1149                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1150                 LASSERT(hash != NULL);
1151                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1152                              &exp->exp_gen_hash);
1153                 cfs_hash_putref(hash);
1154         }
1155 #endif /* HAVE_SERVER_SUPPORT */
1156
1157         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1158         list_del_init(&exp->exp_obd_chain_timed);
1159         exp->exp_obd->obd_num_exports--;
1160         spin_unlock(&exp->exp_obd->obd_dev_lock);
1161         atomic_inc(&obd_stale_export_num);
1162
1163         /* A reference is kept by obd_stale_exports list */
1164         obd_stale_export_put(exp);
1165 }
1166 EXPORT_SYMBOL(class_unlink_export);
1167
1168 /* Import management functions */
1169 static void obd_zombie_import_free(struct obd_import *imp)
1170 {
1171         ENTRY;
1172
1173         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1174                 imp->imp_obd->obd_name);
1175
1176         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1177
1178         ptlrpc_put_connection_superhack(imp->imp_connection);
1179
1180         while (!list_empty(&imp->imp_conn_list)) {
1181                 struct obd_import_conn *imp_conn;
1182
1183                 imp_conn = list_entry(imp->imp_conn_list.next,
1184                                       struct obd_import_conn, oic_item);
1185                 list_del_init(&imp_conn->oic_item);
1186                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1187                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1188         }
1189
1190         LASSERT(imp->imp_sec == NULL);
1191         class_decref(imp->imp_obd, "import", imp);
1192         OBD_FREE_PTR(imp);
1193         EXIT;
1194 }
1195
1196 struct obd_import *class_import_get(struct obd_import *import)
1197 {
1198         atomic_inc(&import->imp_refcount);
1199         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1200                atomic_read(&import->imp_refcount),
1201                import->imp_obd->obd_name);
1202         return import;
1203 }
1204 EXPORT_SYMBOL(class_import_get);
1205
1206 void class_import_put(struct obd_import *imp)
1207 {
1208         ENTRY;
1209
1210         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1211
1212         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1213                atomic_read(&imp->imp_refcount) - 1,
1214                imp->imp_obd->obd_name);
1215
1216         if (atomic_dec_and_test(&imp->imp_refcount)) {
1217                 CDEBUG(D_INFO, "final put import %p\n", imp);
1218                 obd_zombie_import_add(imp);
1219         }
1220
1221         EXIT;
1222 }
1223 EXPORT_SYMBOL(class_import_put);
1224
1225 static void init_imp_at(struct imp_at *at) {
1226         int i;
1227         at_init(&at->iat_net_latency, 0, 0);
1228         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1229                 /* max service estimates are tracked on the server side, so
1230                    don't use the AT history here, just use the last reported
1231                    val. (But keep hist for proc histogram, worst_ever) */
1232                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1233                         AT_FLG_NOHIST);
1234         }
1235 }
1236
1237 static void obd_zombie_imp_cull(struct work_struct *ws)
1238 {
1239         struct obd_import *import;
1240
1241         import = container_of(ws, struct obd_import, imp_zombie_work);
1242         obd_zombie_import_free(import);
1243 }
1244
1245 struct obd_import *class_new_import(struct obd_device *obd)
1246 {
1247         struct obd_import *imp;
1248         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1249
1250         OBD_ALLOC(imp, sizeof(*imp));
1251         if (imp == NULL)
1252                 return NULL;
1253
1254         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1255         INIT_LIST_HEAD(&imp->imp_replay_list);
1256         INIT_LIST_HEAD(&imp->imp_sending_list);
1257         INIT_LIST_HEAD(&imp->imp_delayed_list);
1258         INIT_LIST_HEAD(&imp->imp_committed_list);
1259         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1260         imp->imp_known_replied_xid = 0;
1261         imp->imp_replay_cursor = &imp->imp_committed_list;
1262         spin_lock_init(&imp->imp_lock);
1263         imp->imp_last_success_conn = 0;
1264         imp->imp_state = LUSTRE_IMP_NEW;
1265         imp->imp_obd = class_incref(obd, "import", imp);
1266         rwlock_init(&imp->imp_sec_lock);
1267         init_waitqueue_head(&imp->imp_recovery_waitq);
1268         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1269
1270         if (curr_pid_ns->child_reaper)
1271                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1272         else
1273                 imp->imp_sec_refpid = 1;
1274
1275         atomic_set(&imp->imp_refcount, 2);
1276         atomic_set(&imp->imp_unregistering, 0);
1277         atomic_set(&imp->imp_inflight, 0);
1278         atomic_set(&imp->imp_replay_inflight, 0);
1279         atomic_set(&imp->imp_inval_count, 0);
1280         INIT_LIST_HEAD(&imp->imp_conn_list);
1281         init_imp_at(&imp->imp_at);
1282
1283         /* the default magic is V2, will be used in connect RPC, and
1284          * then adjusted according to the flags in request/reply. */
1285         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1286
1287         return imp;
1288 }
1289 EXPORT_SYMBOL(class_new_import);
1290
1291 void class_destroy_import(struct obd_import *import)
1292 {
1293         LASSERT(import != NULL);
1294         LASSERT(import != LP_POISON);
1295
1296         spin_lock(&import->imp_lock);
1297         import->imp_generation++;
1298         spin_unlock(&import->imp_lock);
1299         class_import_put(import);
1300 }
1301 EXPORT_SYMBOL(class_destroy_import);
1302
1303 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1304
1305 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1306 {
1307         spin_lock(&exp->exp_locks_list_guard);
1308
1309         LASSERT(lock->l_exp_refs_nr >= 0);
1310
1311         if (lock->l_exp_refs_target != NULL &&
1312             lock->l_exp_refs_target != exp) {
1313                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1314                               exp, lock, lock->l_exp_refs_target);
1315         }
1316         if ((lock->l_exp_refs_nr ++) == 0) {
1317                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1318                 lock->l_exp_refs_target = exp;
1319         }
1320         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1321                lock, exp, lock->l_exp_refs_nr);
1322         spin_unlock(&exp->exp_locks_list_guard);
1323 }
1324 EXPORT_SYMBOL(__class_export_add_lock_ref);
1325
1326 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1327 {
1328         spin_lock(&exp->exp_locks_list_guard);
1329         LASSERT(lock->l_exp_refs_nr > 0);
1330         if (lock->l_exp_refs_target != exp) {
1331                 LCONSOLE_WARN("lock %p, "
1332                               "mismatching export pointers: %p, %p\n",
1333                               lock, lock->l_exp_refs_target, exp);
1334         }
1335         if (-- lock->l_exp_refs_nr == 0) {
1336                 list_del_init(&lock->l_exp_refs_link);
1337                 lock->l_exp_refs_target = NULL;
1338         }
1339         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1340                lock, exp, lock->l_exp_refs_nr);
1341         spin_unlock(&exp->exp_locks_list_guard);
1342 }
1343 EXPORT_SYMBOL(__class_export_del_lock_ref);
1344 #endif
1345
1346 /* A connection defines an export context in which preallocation can
1347    be managed. This releases the export pointer reference, and returns
1348    the export handle, so the export refcount is 1 when this function
1349    returns. */
1350 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1351                   struct obd_uuid *cluuid)
1352 {
1353         struct obd_export *export;
1354         LASSERT(conn != NULL);
1355         LASSERT(obd != NULL);
1356         LASSERT(cluuid != NULL);
1357         ENTRY;
1358
1359         export = class_new_export(obd, cluuid);
1360         if (IS_ERR(export))
1361                 RETURN(PTR_ERR(export));
1362
1363         conn->cookie = export->exp_handle.h_cookie;
1364         class_export_put(export);
1365
1366         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1367                cluuid->uuid, conn->cookie);
1368         RETURN(0);
1369 }
1370 EXPORT_SYMBOL(class_connect);
1371
1372 /* if export is involved in recovery then clean up related things */
1373 static void class_export_recovery_cleanup(struct obd_export *exp)
1374 {
1375         struct obd_device *obd = exp->exp_obd;
1376
1377         spin_lock(&obd->obd_recovery_task_lock);
1378         if (obd->obd_recovering) {
1379                 if (exp->exp_in_recovery) {
1380                         spin_lock(&exp->exp_lock);
1381                         exp->exp_in_recovery = 0;
1382                         spin_unlock(&exp->exp_lock);
1383                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1384                         atomic_dec(&obd->obd_connected_clients);
1385                 }
1386
1387                 /* if called during recovery then should update
1388                  * obd_stale_clients counter,
1389                  * lightweight exports are not counted */
1390                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1391                         exp->exp_obd->obd_stale_clients++;
1392         }
1393         spin_unlock(&obd->obd_recovery_task_lock);
1394
1395         spin_lock(&exp->exp_lock);
1396         /** Cleanup req replay fields */
1397         if (exp->exp_req_replay_needed) {
1398                 exp->exp_req_replay_needed = 0;
1399
1400                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1401                 atomic_dec(&obd->obd_req_replay_clients);
1402         }
1403
1404         /** Cleanup lock replay data */
1405         if (exp->exp_lock_replay_needed) {
1406                 exp->exp_lock_replay_needed = 0;
1407
1408                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1409                 atomic_dec(&obd->obd_lock_replay_clients);
1410         }
1411         spin_unlock(&exp->exp_lock);
1412 }
1413
1414 /* This function removes 1-3 references from the export:
1415  * 1 - for export pointer passed
1416  * and if disconnect really need
1417  * 2 - removing from hash
1418  * 3 - in client_unlink_export
1419  * The export pointer passed to this function can destroyed */
1420 int class_disconnect(struct obd_export *export)
1421 {
1422         int already_disconnected;
1423         ENTRY;
1424
1425         if (export == NULL) {
1426                 CWARN("attempting to free NULL export %p\n", export);
1427                 RETURN(-EINVAL);
1428         }
1429
1430         spin_lock(&export->exp_lock);
1431         already_disconnected = export->exp_disconnected;
1432         export->exp_disconnected = 1;
1433         /*  We hold references of export for uuid hash
1434          *  and nid_hash and export link at least. So
1435          *  it is safe to call cfs_hash_del in there.  */
1436         if (!hlist_unhashed(&export->exp_nid_hash))
1437                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1438                              &export->exp_connection->c_peer.nid,
1439                              &export->exp_nid_hash);
1440         spin_unlock(&export->exp_lock);
1441
1442         /* class_cleanup(), abort_recovery(), and class_fail_export()
1443          * all end up in here, and if any of them race we shouldn't
1444          * call extra class_export_puts(). */
1445         if (already_disconnected) {
1446                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1447                 GOTO(no_disconn, already_disconnected);
1448         }
1449
1450         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1451                export->exp_handle.h_cookie);
1452
1453         class_export_recovery_cleanup(export);
1454         class_unlink_export(export);
1455 no_disconn:
1456         class_export_put(export);
1457         RETURN(0);
1458 }
1459 EXPORT_SYMBOL(class_disconnect);
1460
1461 /* Return non-zero for a fully connected export */
1462 int class_connected_export(struct obd_export *exp)
1463 {
1464         int connected = 0;
1465
1466         if (exp) {
1467                 spin_lock(&exp->exp_lock);
1468                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1469                 spin_unlock(&exp->exp_lock);
1470         }
1471         return connected;
1472 }
1473 EXPORT_SYMBOL(class_connected_export);
1474
1475 static void class_disconnect_export_list(struct list_head *list,
1476                                          enum obd_option flags)
1477 {
1478         int rc;
1479         struct obd_export *exp;
1480         ENTRY;
1481
1482         /* It's possible that an export may disconnect itself, but
1483          * nothing else will be added to this list. */
1484         while (!list_empty(list)) {
1485                 exp = list_entry(list->next, struct obd_export,
1486                                  exp_obd_chain);
1487                 /* need for safe call CDEBUG after obd_disconnect */
1488                 class_export_get(exp);
1489
1490                 spin_lock(&exp->exp_lock);
1491                 exp->exp_flags = flags;
1492                 spin_unlock(&exp->exp_lock);
1493
1494                 if (obd_uuid_equals(&exp->exp_client_uuid,
1495                                     &exp->exp_obd->obd_uuid)) {
1496                         CDEBUG(D_HA,
1497                                "exp %p export uuid == obd uuid, don't discon\n",
1498                                exp);
1499                         /* Need to delete this now so we don't end up pointing
1500                          * to work_list later when this export is cleaned up. */
1501                         list_del_init(&exp->exp_obd_chain);
1502                         class_export_put(exp);
1503                         continue;
1504                 }
1505
1506                 class_export_get(exp);
1507                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1508                        "last request at %lld\n",
1509                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1510                        exp, exp->exp_last_request_time);
1511                 /* release one export reference anyway */
1512                 rc = obd_disconnect(exp);
1513
1514                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1515                        obd_export_nid2str(exp), exp, rc);
1516                 class_export_put(exp);
1517         }
1518         EXIT;
1519 }
1520
1521 void class_disconnect_exports(struct obd_device *obd)
1522 {
1523         struct list_head work_list;
1524         ENTRY;
1525
1526         /* Move all of the exports from obd_exports to a work list, en masse. */
1527         INIT_LIST_HEAD(&work_list);
1528         spin_lock(&obd->obd_dev_lock);
1529         list_splice_init(&obd->obd_exports, &work_list);
1530         list_splice_init(&obd->obd_delayed_exports, &work_list);
1531         spin_unlock(&obd->obd_dev_lock);
1532
1533         if (!list_empty(&work_list)) {
1534                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1535                        "disconnecting them\n", obd->obd_minor, obd);
1536                 class_disconnect_export_list(&work_list,
1537                                              exp_flags_from_obd(obd));
1538         } else
1539                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1540                        obd->obd_minor, obd);
1541         EXIT;
1542 }
1543 EXPORT_SYMBOL(class_disconnect_exports);
1544
1545 /* Remove exports that have not completed recovery.
1546  */
1547 void class_disconnect_stale_exports(struct obd_device *obd,
1548                                     int (*test_export)(struct obd_export *))
1549 {
1550         struct list_head work_list;
1551         struct obd_export *exp, *n;
1552         int evicted = 0;
1553         ENTRY;
1554
1555         INIT_LIST_HEAD(&work_list);
1556         spin_lock(&obd->obd_dev_lock);
1557         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1558                                  exp_obd_chain) {
1559                 /* don't count self-export as client */
1560                 if (obd_uuid_equals(&exp->exp_client_uuid,
1561                                     &exp->exp_obd->obd_uuid))
1562                         continue;
1563
1564                 /* don't evict clients which have no slot in last_rcvd
1565                  * (e.g. lightweight connection) */
1566                 if (exp->exp_target_data.ted_lr_idx == -1)
1567                         continue;
1568
1569                 spin_lock(&exp->exp_lock);
1570                 if (exp->exp_failed || test_export(exp)) {
1571                         spin_unlock(&exp->exp_lock);
1572                         continue;
1573                 }
1574                 exp->exp_failed = 1;
1575                 spin_unlock(&exp->exp_lock);
1576
1577                 list_move(&exp->exp_obd_chain, &work_list);
1578                 evicted++;
1579                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1580                        obd->obd_name, exp->exp_client_uuid.uuid,
1581                        obd_export_nid2str(exp));
1582                 print_export_data(exp, "EVICTING", 0, D_HA);
1583         }
1584         spin_unlock(&obd->obd_dev_lock);
1585
1586         if (evicted)
1587                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1588                               obd->obd_name, evicted);
1589
1590         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1591                                                  OBD_OPT_ABORT_RECOV);
1592         EXIT;
1593 }
1594 EXPORT_SYMBOL(class_disconnect_stale_exports);
1595
1596 void class_fail_export(struct obd_export *exp)
1597 {
1598         int rc, already_failed;
1599
1600         spin_lock(&exp->exp_lock);
1601         already_failed = exp->exp_failed;
1602         exp->exp_failed = 1;
1603         spin_unlock(&exp->exp_lock);
1604
1605         if (already_failed) {
1606                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1607                        exp, exp->exp_client_uuid.uuid);
1608                 return;
1609         }
1610
1611         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1612                exp, exp->exp_client_uuid.uuid);
1613
1614         if (obd_dump_on_timeout)
1615                 libcfs_debug_dumplog();
1616
1617         /* need for safe call CDEBUG after obd_disconnect */
1618         class_export_get(exp);
1619
1620         /* Most callers into obd_disconnect are removing their own reference
1621          * (request, for example) in addition to the one from the hash table.
1622          * We don't have such a reference here, so make one. */
1623         class_export_get(exp);
1624         rc = obd_disconnect(exp);
1625         if (rc)
1626                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1627         else
1628                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1629                        exp, exp->exp_client_uuid.uuid);
1630         class_export_put(exp);
1631 }
1632 EXPORT_SYMBOL(class_fail_export);
1633
1634 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1635 {
1636         struct cfs_hash *nid_hash;
1637         struct obd_export *doomed_exp = NULL;
1638         int exports_evicted = 0;
1639
1640         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1641
1642         spin_lock(&obd->obd_dev_lock);
1643         /* umount has run already, so evict thread should leave
1644          * its task to umount thread now */
1645         if (obd->obd_stopping) {
1646                 spin_unlock(&obd->obd_dev_lock);
1647                 return exports_evicted;
1648         }
1649         nid_hash = obd->obd_nid_hash;
1650         cfs_hash_getref(nid_hash);
1651         spin_unlock(&obd->obd_dev_lock);
1652
1653         do {
1654                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1655                 if (doomed_exp == NULL)
1656                         break;
1657
1658                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1659                          "nid %s found, wanted nid %s, requested nid %s\n",
1660                          obd_export_nid2str(doomed_exp),
1661                          libcfs_nid2str(nid_key), nid);
1662                 LASSERTF(doomed_exp != obd->obd_self_export,
1663                          "self-export is hashed by NID?\n");
1664                 exports_evicted++;
1665                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1666                               "request\n", obd->obd_name,
1667                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1668                               obd_export_nid2str(doomed_exp));
1669                 class_fail_export(doomed_exp);
1670                 class_export_put(doomed_exp);
1671         } while (1);
1672
1673         cfs_hash_putref(nid_hash);
1674
1675         if (!exports_evicted)
1676                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1677                        obd->obd_name, nid);
1678         return exports_evicted;
1679 }
1680 EXPORT_SYMBOL(obd_export_evict_by_nid);
1681
1682 #ifdef HAVE_SERVER_SUPPORT
1683 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1684 {
1685         struct obd_export *doomed_exp = NULL;
1686         struct obd_uuid doomed_uuid;
1687         int exports_evicted = 0;
1688
1689         spin_lock(&obd->obd_dev_lock);
1690         if (obd->obd_stopping) {
1691                 spin_unlock(&obd->obd_dev_lock);
1692                 return exports_evicted;
1693         }
1694         spin_unlock(&obd->obd_dev_lock);
1695
1696         obd_str2uuid(&doomed_uuid, uuid);
1697         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1698                 CERROR("%s: can't evict myself\n", obd->obd_name);
1699                 return exports_evicted;
1700         }
1701
1702         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1703         if (doomed_exp == NULL) {
1704                 CERROR("%s: can't disconnect %s: no exports found\n",
1705                        obd->obd_name, uuid);
1706         } else {
1707                 CWARN("%s: evicting %s at adminstrative request\n",
1708                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1709                 class_fail_export(doomed_exp);
1710                 class_export_put(doomed_exp);
1711                 obd_uuid_del(obd, doomed_exp);
1712                 exports_evicted++;
1713         }
1714
1715         return exports_evicted;
1716 }
1717 #endif /* HAVE_SERVER_SUPPORT */
1718
1719 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1720 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1721 EXPORT_SYMBOL(class_export_dump_hook);
1722 #endif
1723
1724 static void print_export_data(struct obd_export *exp, const char *status,
1725                               int locks, int debug_level)
1726 {
1727         struct ptlrpc_reply_state *rs;
1728         struct ptlrpc_reply_state *first_reply = NULL;
1729         int nreplies = 0;
1730
1731         spin_lock(&exp->exp_lock);
1732         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1733                             rs_exp_list) {
1734                 if (nreplies == 0)
1735                         first_reply = rs;
1736                 nreplies++;
1737         }
1738         spin_unlock(&exp->exp_lock);
1739
1740         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1741                "%p %s %llu stale:%d\n",
1742                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1743                obd_export_nid2str(exp),
1744                refcount_read(&exp->exp_handle.h_ref),
1745                atomic_read(&exp->exp_rpc_count),
1746                atomic_read(&exp->exp_cb_count),
1747                atomic_read(&exp->exp_locks_count),
1748                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1749                nreplies, first_reply, nreplies > 3 ? "..." : "",
1750                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1751 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1752         if (locks && class_export_dump_hook != NULL)
1753                 class_export_dump_hook(exp);
1754 #endif
1755 }
1756
1757 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1758 {
1759         struct obd_export *exp;
1760
1761         spin_lock(&obd->obd_dev_lock);
1762         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1763                 print_export_data(exp, "ACTIVE", locks, debug_level);
1764         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1765                 print_export_data(exp, "UNLINKED", locks, debug_level);
1766         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1767                 print_export_data(exp, "DELAYED", locks, debug_level);
1768         spin_unlock(&obd->obd_dev_lock);
1769 }
1770
1771 void obd_exports_barrier(struct obd_device *obd)
1772 {
1773         int waited = 2;
1774         LASSERT(list_empty(&obd->obd_exports));
1775         spin_lock(&obd->obd_dev_lock);
1776         while (!list_empty(&obd->obd_unlinked_exports)) {
1777                 spin_unlock(&obd->obd_dev_lock);
1778                 set_current_state(TASK_UNINTERRUPTIBLE);
1779                 schedule_timeout(cfs_time_seconds(waited));
1780                 if (waited > 5 && is_power_of_2(waited)) {
1781                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1782                                       "more than %d seconds. "
1783                                       "The obd refcount = %d. Is it stuck?\n",
1784                                       obd->obd_name, waited,
1785                                       atomic_read(&obd->obd_refcount));
1786                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1787                 }
1788                 waited *= 2;
1789                 spin_lock(&obd->obd_dev_lock);
1790         }
1791         spin_unlock(&obd->obd_dev_lock);
1792 }
1793 EXPORT_SYMBOL(obd_exports_barrier);
1794
1795 /**
1796  * Add export to the obd_zombe thread and notify it.
1797  */
1798 static void obd_zombie_export_add(struct obd_export *exp) {
1799         atomic_dec(&obd_stale_export_num);
1800         spin_lock(&exp->exp_obd->obd_dev_lock);
1801         LASSERT(!list_empty(&exp->exp_obd_chain));
1802         list_del_init(&exp->exp_obd_chain);
1803         spin_unlock(&exp->exp_obd->obd_dev_lock);
1804
1805         queue_work(zombie_wq, &exp->exp_zombie_work);
1806 }
1807
1808 /**
1809  * Add import to the obd_zombe thread and notify it.
1810  */
1811 static void obd_zombie_import_add(struct obd_import *imp) {
1812         LASSERT(imp->imp_sec == NULL);
1813
1814         queue_work(zombie_wq, &imp->imp_zombie_work);
1815 }
1816
1817 /**
1818  * wait when obd_zombie import/export queues become empty
1819  */
1820 void obd_zombie_barrier(void)
1821 {
1822         flush_workqueue(zombie_wq);
1823 }
1824 EXPORT_SYMBOL(obd_zombie_barrier);
1825
1826
1827 struct obd_export *obd_stale_export_get(void)
1828 {
1829         struct obd_export *exp = NULL;
1830         ENTRY;
1831
1832         spin_lock(&obd_stale_export_lock);
1833         if (!list_empty(&obd_stale_exports)) {
1834                 exp = list_entry(obd_stale_exports.next,
1835                                  struct obd_export, exp_stale_list);
1836                 list_del_init(&exp->exp_stale_list);
1837         }
1838         spin_unlock(&obd_stale_export_lock);
1839
1840         if (exp) {
1841                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1842                        atomic_read(&obd_stale_export_num));
1843         }
1844         RETURN(exp);
1845 }
1846 EXPORT_SYMBOL(obd_stale_export_get);
1847
1848 void obd_stale_export_put(struct obd_export *exp)
1849 {
1850         ENTRY;
1851
1852         LASSERT(list_empty(&exp->exp_stale_list));
1853         if (exp->exp_lock_hash &&
1854             atomic_read(&exp->exp_lock_hash->hs_count)) {
1855                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1856                        atomic_read(&obd_stale_export_num));
1857
1858                 spin_lock_bh(&exp->exp_bl_list_lock);
1859                 spin_lock(&obd_stale_export_lock);
1860                 /* Add to the tail if there is no blocked locks,
1861                  * to the head otherwise. */
1862                 if (list_empty(&exp->exp_bl_list))
1863                         list_add_tail(&exp->exp_stale_list,
1864                                       &obd_stale_exports);
1865                 else
1866                         list_add(&exp->exp_stale_list,
1867                                  &obd_stale_exports);
1868
1869                 spin_unlock(&obd_stale_export_lock);
1870                 spin_unlock_bh(&exp->exp_bl_list_lock);
1871         } else {
1872                 class_export_put(exp);
1873         }
1874         EXIT;
1875 }
1876 EXPORT_SYMBOL(obd_stale_export_put);
1877
1878 /**
1879  * Adjust the position of the export in the stale list,
1880  * i.e. move to the head of the list if is needed.
1881  **/
1882 void obd_stale_export_adjust(struct obd_export *exp)
1883 {
1884         LASSERT(exp != NULL);
1885         spin_lock_bh(&exp->exp_bl_list_lock);
1886         spin_lock(&obd_stale_export_lock);
1887
1888         if (!list_empty(&exp->exp_stale_list) &&
1889             !list_empty(&exp->exp_bl_list))
1890                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1891
1892         spin_unlock(&obd_stale_export_lock);
1893         spin_unlock_bh(&exp->exp_bl_list_lock);
1894 }
1895 EXPORT_SYMBOL(obd_stale_export_adjust);
1896
1897 /**
1898  * start destroy zombie import/export thread
1899  */
1900 int obd_zombie_impexp_init(void)
1901 {
1902         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1903         if (!zombie_wq)
1904                 return -ENOMEM;
1905
1906         return 0;
1907 }
1908
1909 /**
1910  * stop destroy zombie import/export thread
1911  */
1912 void obd_zombie_impexp_stop(void)
1913 {
1914         destroy_workqueue(zombie_wq);
1915         LASSERT(list_empty(&obd_stale_exports));
1916 }
1917
1918 /***** Kernel-userspace comm helpers *******/
1919
1920 /* Get length of entire message, including header */
1921 int kuc_len(int payload_len)
1922 {
1923         return sizeof(struct kuc_hdr) + payload_len;
1924 }
1925 EXPORT_SYMBOL(kuc_len);
1926
1927 /* Get a pointer to kuc header, given a ptr to the payload
1928  * @param p Pointer to payload area
1929  * @returns Pointer to kuc header
1930  */
1931 struct kuc_hdr * kuc_ptr(void *p)
1932 {
1933         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1934         LASSERT(lh->kuc_magic == KUC_MAGIC);
1935         return lh;
1936 }
1937 EXPORT_SYMBOL(kuc_ptr);
1938
1939 /* Alloc space for a message, and fill in header
1940  * @return Pointer to payload area
1941  */
1942 void *kuc_alloc(int payload_len, int transport, int type)
1943 {
1944         struct kuc_hdr *lh;
1945         int len = kuc_len(payload_len);
1946
1947         OBD_ALLOC(lh, len);
1948         if (lh == NULL)
1949                 return ERR_PTR(-ENOMEM);
1950
1951         lh->kuc_magic = KUC_MAGIC;
1952         lh->kuc_transport = transport;
1953         lh->kuc_msgtype = type;
1954         lh->kuc_msglen = len;
1955
1956         return (void *)(lh + 1);
1957 }
1958 EXPORT_SYMBOL(kuc_alloc);
1959
1960 /* Takes pointer to payload area */
1961 void kuc_free(void *p, int payload_len)
1962 {
1963         struct kuc_hdr *lh = kuc_ptr(p);
1964         OBD_FREE(lh, kuc_len(payload_len));
1965 }
1966 EXPORT_SYMBOL(kuc_free);
1967
1968 struct obd_request_slot_waiter {
1969         struct list_head        orsw_entry;
1970         wait_queue_head_t       orsw_waitq;
1971         bool                    orsw_signaled;
1972 };
1973
1974 static bool obd_request_slot_avail(struct client_obd *cli,
1975                                    struct obd_request_slot_waiter *orsw)
1976 {
1977         bool avail;
1978
1979         spin_lock(&cli->cl_loi_list_lock);
1980         avail = !!list_empty(&orsw->orsw_entry);
1981         spin_unlock(&cli->cl_loi_list_lock);
1982
1983         return avail;
1984 };
1985
1986 /*
1987  * For network flow control, the RPC sponsor needs to acquire a credit
1988  * before sending the RPC. The credits count for a connection is defined
1989  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1990  * the subsequent RPC sponsors need to wait until others released their
1991  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1992  */
1993 int obd_get_request_slot(struct client_obd *cli)
1994 {
1995         struct obd_request_slot_waiter   orsw;
1996         struct l_wait_info               lwi;
1997         int                              rc;
1998
1999         spin_lock(&cli->cl_loi_list_lock);
2000         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2001                 cli->cl_rpcs_in_flight++;
2002                 spin_unlock(&cli->cl_loi_list_lock);
2003                 return 0;
2004         }
2005
2006         init_waitqueue_head(&orsw.orsw_waitq);
2007         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2008         orsw.orsw_signaled = false;
2009         spin_unlock(&cli->cl_loi_list_lock);
2010
2011         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2012         rc = l_wait_event(orsw.orsw_waitq,
2013                           obd_request_slot_avail(cli, &orsw) ||
2014                           orsw.orsw_signaled,
2015                           &lwi);
2016
2017         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2018          * freed but other (such as obd_put_request_slot) is using it. */
2019         spin_lock(&cli->cl_loi_list_lock);
2020         if (rc != 0) {
2021                 if (!orsw.orsw_signaled) {
2022                         if (list_empty(&orsw.orsw_entry))
2023                                 cli->cl_rpcs_in_flight--;
2024                         else
2025                                 list_del(&orsw.orsw_entry);
2026                 }
2027         }
2028
2029         if (orsw.orsw_signaled) {
2030                 LASSERT(list_empty(&orsw.orsw_entry));
2031
2032                 rc = -EINTR;
2033         }
2034         spin_unlock(&cli->cl_loi_list_lock);
2035
2036         return rc;
2037 }
2038 EXPORT_SYMBOL(obd_get_request_slot);
2039
2040 void obd_put_request_slot(struct client_obd *cli)
2041 {
2042         struct obd_request_slot_waiter *orsw;
2043
2044         spin_lock(&cli->cl_loi_list_lock);
2045         cli->cl_rpcs_in_flight--;
2046
2047         /* If there is free slot, wakeup the first waiter. */
2048         if (!list_empty(&cli->cl_flight_waiters) &&
2049             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2050                 orsw = list_entry(cli->cl_flight_waiters.next,
2051                                   struct obd_request_slot_waiter, orsw_entry);
2052                 list_del_init(&orsw->orsw_entry);
2053                 cli->cl_rpcs_in_flight++;
2054                 wake_up(&orsw->orsw_waitq);
2055         }
2056         spin_unlock(&cli->cl_loi_list_lock);
2057 }
2058 EXPORT_SYMBOL(obd_put_request_slot);
2059
2060 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2061 {
2062         return cli->cl_max_rpcs_in_flight;
2063 }
2064 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2065
2066 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2067 {
2068         struct obd_request_slot_waiter *orsw;
2069         __u32                           old;
2070         int                             diff;
2071         int                             i;
2072         const char *type_name;
2073         int                             rc;
2074
2075         if (max > OBD_MAX_RIF_MAX || max < 1)
2076                 return -ERANGE;
2077
2078         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2079         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2080                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2081                  * strictly lower that max_rpcs_in_flight */
2082                 if (max < 2) {
2083                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2084                                "because it must be higher than "
2085                                "max_mod_rpcs_in_flight value",
2086                                cli->cl_import->imp_obd->obd_name);
2087                         return -ERANGE;
2088                 }
2089                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2090                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2091                         if (rc != 0)
2092                                 return rc;
2093                 }
2094         }
2095
2096         spin_lock(&cli->cl_loi_list_lock);
2097         old = cli->cl_max_rpcs_in_flight;
2098         cli->cl_max_rpcs_in_flight = max;
2099         client_adjust_max_dirty(cli);
2100
2101         diff = max - old;
2102
2103         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2104         for (i = 0; i < diff; i++) {
2105                 if (list_empty(&cli->cl_flight_waiters))
2106                         break;
2107
2108                 orsw = list_entry(cli->cl_flight_waiters.next,
2109                                   struct obd_request_slot_waiter, orsw_entry);
2110                 list_del_init(&orsw->orsw_entry);
2111                 cli->cl_rpcs_in_flight++;
2112                 wake_up(&orsw->orsw_waitq);
2113         }
2114         spin_unlock(&cli->cl_loi_list_lock);
2115
2116         return 0;
2117 }
2118 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2119
2120 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2121 {
2122         return cli->cl_max_mod_rpcs_in_flight;
2123 }
2124 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2125
2126 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2127 {
2128         struct obd_connect_data *ocd;
2129         __u16 maxmodrpcs;
2130         __u16 prev;
2131
2132         if (max > OBD_MAX_RIF_MAX || max < 1)
2133                 return -ERANGE;
2134
2135         /* cannot exceed or equal max_rpcs_in_flight */
2136         if (max >= cli->cl_max_rpcs_in_flight) {
2137                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2138                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2139                        cli->cl_import->imp_obd->obd_name,
2140                        max, cli->cl_max_rpcs_in_flight);
2141                 return -ERANGE;
2142         }
2143
2144         /* cannot exceed max modify RPCs in flight supported by the server */
2145         ocd = &cli->cl_import->imp_connect_data;
2146         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2147                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2148         else
2149                 maxmodrpcs = 1;
2150         if (max > maxmodrpcs) {
2151                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2152                        "higher than max_mod_rpcs_per_client value (%hu) "
2153                        "returned by the server at connection\n",
2154                        cli->cl_import->imp_obd->obd_name,
2155                        max, maxmodrpcs);
2156                 return -ERANGE;
2157         }
2158
2159         spin_lock(&cli->cl_mod_rpcs_lock);
2160
2161         prev = cli->cl_max_mod_rpcs_in_flight;
2162         cli->cl_max_mod_rpcs_in_flight = max;
2163
2164         /* wakeup waiters if limit has been increased */
2165         if (cli->cl_max_mod_rpcs_in_flight > prev)
2166                 wake_up(&cli->cl_mod_rpcs_waitq);
2167
2168         spin_unlock(&cli->cl_mod_rpcs_lock);
2169
2170         return 0;
2171 }
2172 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2173
2174 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2175                                struct seq_file *seq)
2176 {
2177         unsigned long mod_tot = 0, mod_cum;
2178         struct timespec64 now;
2179         int i;
2180
2181         ktime_get_real_ts64(&now);
2182
2183         spin_lock(&cli->cl_mod_rpcs_lock);
2184
2185         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2186                    (s64)now.tv_sec, now.tv_nsec);
2187         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2188                    cli->cl_mod_rpcs_in_flight);
2189
2190         seq_printf(seq, "\n\t\t\tmodify\n");
2191         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2192
2193         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2194
2195         mod_cum = 0;
2196         for (i = 0; i < OBD_HIST_MAX; i++) {
2197                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2198                 mod_cum += mod;
2199                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2200                            i, mod, pct(mod, mod_tot),
2201                            pct(mod_cum, mod_tot));
2202                 if (mod_cum == mod_tot)
2203                         break;
2204         }
2205
2206         spin_unlock(&cli->cl_mod_rpcs_lock);
2207
2208         return 0;
2209 }
2210 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2211
2212 /* The number of modify RPCs sent in parallel is limited
2213  * because the server has a finite number of slots per client to
2214  * store request result and ensure reply reconstruction when needed.
2215  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2216  * that takes into account server limit and cl_max_rpcs_in_flight
2217  * value.
2218  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2219  * one close request is allowed above the maximum.
2220  */
2221 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2222                                                  bool close_req)
2223 {
2224         bool avail;
2225
2226         /* A slot is available if
2227          * - number of modify RPCs in flight is less than the max
2228          * - it's a close RPC and no other close request is in flight
2229          */
2230         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2231                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2232
2233         return avail;
2234 }
2235
2236 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2237                                          bool close_req)
2238 {
2239         bool avail;
2240
2241         spin_lock(&cli->cl_mod_rpcs_lock);
2242         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2243         spin_unlock(&cli->cl_mod_rpcs_lock);
2244         return avail;
2245 }
2246
2247
2248 /* Get a modify RPC slot from the obd client @cli according
2249  * to the kind of operation @opc that is going to be sent
2250  * and the intent @it of the operation if it applies.
2251  * If the maximum number of modify RPCs in flight is reached
2252  * the thread is put to sleep.
2253  * Returns the tag to be set in the request message. Tag 0
2254  * is reserved for non-modifying requests.
2255  */
2256 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2257 {
2258         bool                    close_req = false;
2259         __u16                   i, max;
2260
2261         if (opc == MDS_CLOSE)
2262                 close_req = true;
2263
2264         do {
2265                 spin_lock(&cli->cl_mod_rpcs_lock);
2266                 max = cli->cl_max_mod_rpcs_in_flight;
2267                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2268                         /* there is a slot available */
2269                         cli->cl_mod_rpcs_in_flight++;
2270                         if (close_req)
2271                                 cli->cl_close_rpcs_in_flight++;
2272                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2273                                          cli->cl_mod_rpcs_in_flight);
2274                         /* find a free tag */
2275                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2276                                                 max + 1);
2277                         LASSERT(i < OBD_MAX_RIF_MAX);
2278                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2279                         spin_unlock(&cli->cl_mod_rpcs_lock);
2280                         /* tag 0 is reserved for non-modify RPCs */
2281
2282                         CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2283                                "opc %u, max %hu\n",
2284                                cli->cl_import->imp_obd->obd_name,
2285                                i + 1, opc, max);
2286
2287                         return i + 1;
2288                 }
2289                 spin_unlock(&cli->cl_mod_rpcs_lock);
2290
2291                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2292                        "opc %u, max %hu\n",
2293                        cli->cl_import->imp_obd->obd_name, opc, max);
2294
2295                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2296                                           obd_mod_rpc_slot_avail(cli,
2297                                                                  close_req));
2298         } while (true);
2299 }
2300 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2301
2302 /* Put a modify RPC slot from the obd client @cli according
2303  * to the kind of operation @opc that has been sent.
2304  */
2305 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2306 {
2307         bool                    close_req = false;
2308
2309         if (tag == 0)
2310                 return;
2311
2312         if (opc == MDS_CLOSE)
2313                 close_req = true;
2314
2315         spin_lock(&cli->cl_mod_rpcs_lock);
2316         cli->cl_mod_rpcs_in_flight--;
2317         if (close_req)
2318                 cli->cl_close_rpcs_in_flight--;
2319         /* release the tag in the bitmap */
2320         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2321         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2322         spin_unlock(&cli->cl_mod_rpcs_lock);
2323         wake_up(&cli->cl_mod_rpcs_waitq);
2324 }
2325 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2326