Whamcloud - gitweb
LU-3031 ldlm: disconnect speedup
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
48
49 spinlock_t obd_types_lock;
50
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
55
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t  obd_zombie_impexp_lock;
59
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64                               const char *status, int locks);
65
66 struct list_head obd_stale_exports;
67 spinlock_t       obd_stale_export_lock;
68 atomic_t         obd_stale_export_num;
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
72
73 /*
74  * support functions: we could use inter-module communication, but this
75  * is more portable to other OS's
76  */
77 static struct obd_device *obd_device_alloc(void)
78 {
79         struct obd_device *obd;
80
81         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
82         if (obd != NULL) {
83                 obd->obd_magic = OBD_DEVICE_MAGIC;
84         }
85         return obd;
86 }
87
88 static void obd_device_free(struct obd_device *obd)
89 {
90         LASSERT(obd != NULL);
91         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
92                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
93         if (obd->obd_namespace != NULL) {
94                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
95                        obd, obd->obd_namespace, obd->obd_force);
96                 LBUG();
97         }
98         lu_ref_fini(&obd->obd_reference);
99         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 }
101
102 struct obd_type *class_search_type(const char *name)
103 {
104         struct list_head *tmp;
105         struct obd_type *type;
106
107         spin_lock(&obd_types_lock);
108         list_for_each(tmp, &obd_types) {
109                 type = list_entry(tmp, struct obd_type, typ_chain);
110                 if (strcmp(type->typ_name, name) == 0) {
111                         spin_unlock(&obd_types_lock);
112                         return type;
113                 }
114         }
115         spin_unlock(&obd_types_lock);
116         return NULL;
117 }
118 EXPORT_SYMBOL(class_search_type);
119
120 struct obd_type *class_get_type(const char *name)
121 {
122         struct obd_type *type = class_search_type(name);
123
124 #ifdef HAVE_MODULE_LOADING_SUPPORT
125         if (!type) {
126                 const char *modname = name;
127
128                 if (strcmp(modname, "obdfilter") == 0)
129                         modname = "ofd";
130
131                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
132                         modname = LUSTRE_OSP_NAME;
133
134                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
135                         modname = LUSTRE_MDT_NAME;
136
137                 if (!request_module("%s", modname)) {
138                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
139                         type = class_search_type(name);
140                 } else {
141                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
142                                            modname);
143                 }
144         }
145 #endif
146         if (type) {
147                 spin_lock(&type->obd_type_lock);
148                 type->typ_refcnt++;
149                 try_module_get(type->typ_dt_ops->o_owner);
150                 spin_unlock(&type->obd_type_lock);
151         }
152         return type;
153 }
154
155 void class_put_type(struct obd_type *type)
156 {
157         LASSERT(type);
158         spin_lock(&type->obd_type_lock);
159         type->typ_refcnt--;
160         module_put(type->typ_dt_ops->o_owner);
161         spin_unlock(&type->obd_type_lock);
162 }
163
164 #define CLASS_MAX_NAME 1024
165
166 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
167                         bool enable_proc, struct lprocfs_vars *vars,
168                         const char *name, struct lu_device_type *ldt)
169 {
170         struct obd_type *type;
171         int rc = 0;
172         ENTRY;
173
174         /* sanity check */
175         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
176
177         if (class_search_type(name)) {
178                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
179                 RETURN(-EEXIST);
180         }
181
182         rc = -ENOMEM;
183         OBD_ALLOC(type, sizeof(*type));
184         if (type == NULL)
185                 RETURN(rc);
186
187         OBD_ALLOC_PTR(type->typ_dt_ops);
188         OBD_ALLOC_PTR(type->typ_md_ops);
189         OBD_ALLOC(type->typ_name, strlen(name) + 1);
190
191         if (type->typ_dt_ops == NULL ||
192             type->typ_md_ops == NULL ||
193             type->typ_name == NULL)
194                 GOTO (failed, rc);
195
196         *(type->typ_dt_ops) = *dt_ops;
197         /* md_ops is optional */
198         if (md_ops)
199                 *(type->typ_md_ops) = *md_ops;
200         strcpy(type->typ_name, name);
201         spin_lock_init(&type->obd_type_lock);
202
203 #ifdef CONFIG_PROC_FS
204         if (enable_proc) {
205                 type->typ_procroot = lprocfs_register(type->typ_name,
206                                                       proc_lustre_root,
207                                                       vars, type);
208                 if (IS_ERR(type->typ_procroot)) {
209                         rc = PTR_ERR(type->typ_procroot);
210                         type->typ_procroot = NULL;
211                         GOTO(failed, rc);
212                 }
213         }
214 #endif
215         if (ldt != NULL) {
216                 type->typ_lu = ldt;
217                 rc = lu_device_type_init(ldt);
218                 if (rc != 0)
219                         GOTO (failed, rc);
220         }
221
222         spin_lock(&obd_types_lock);
223         list_add(&type->typ_chain, &obd_types);
224         spin_unlock(&obd_types_lock);
225
226         RETURN (0);
227
228 failed:
229         if (type->typ_name != NULL) {
230 #ifdef CONFIG_PROC_FS
231                 if (type->typ_procroot != NULL)
232                         remove_proc_subtree(type->typ_name, proc_lustre_root);
233 #endif
234                 OBD_FREE(type->typ_name, strlen(name) + 1);
235         }
236         if (type->typ_md_ops != NULL)
237                 OBD_FREE_PTR(type->typ_md_ops);
238         if (type->typ_dt_ops != NULL)
239                 OBD_FREE_PTR(type->typ_dt_ops);
240         OBD_FREE(type, sizeof(*type));
241         RETURN(rc);
242 }
243 EXPORT_SYMBOL(class_register_type);
244
245 int class_unregister_type(const char *name)
246 {
247         struct obd_type *type = class_search_type(name);
248         ENTRY;
249
250         if (!type) {
251                 CERROR("unknown obd type\n");
252                 RETURN(-EINVAL);
253         }
254
255         if (type->typ_refcnt) {
256                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
257                 /* This is a bad situation, let's make the best of it */
258                 /* Remove ops, but leave the name for debugging */
259                 OBD_FREE_PTR(type->typ_dt_ops);
260                 OBD_FREE_PTR(type->typ_md_ops);
261                 RETURN(-EBUSY);
262         }
263
264         /* we do not use type->typ_procroot as for compatibility purposes
265          * other modules can share names (i.e. lod can use lov entry). so
266          * we can't reference pointer as it can get invalided when another
267          * module removes the entry */
268 #ifdef CONFIG_PROC_FS
269         if (type->typ_procroot != NULL)
270                 remove_proc_subtree(type->typ_name, proc_lustre_root);
271         if (type->typ_procsym != NULL)
272                 lprocfs_remove(&type->typ_procsym);
273 #endif
274         if (type->typ_lu)
275                 lu_device_type_fini(type->typ_lu);
276
277         spin_lock(&obd_types_lock);
278         list_del(&type->typ_chain);
279         spin_unlock(&obd_types_lock);
280         OBD_FREE(type->typ_name, strlen(name) + 1);
281         if (type->typ_dt_ops != NULL)
282                 OBD_FREE_PTR(type->typ_dt_ops);
283         if (type->typ_md_ops != NULL)
284                 OBD_FREE_PTR(type->typ_md_ops);
285         OBD_FREE(type, sizeof(*type));
286         RETURN(0);
287 } /* class_unregister_type */
288 EXPORT_SYMBOL(class_unregister_type);
289
290 /**
291  * Create a new obd device.
292  *
293  * Find an empty slot in ::obd_devs[], create a new obd device in it.
294  *
295  * \param[in] type_name obd device type string.
296  * \param[in] name      obd device name.
297  *
298  * \retval NULL if create fails, otherwise return the obd device
299  *         pointer created.
300  */
301 struct obd_device *class_newdev(const char *type_name, const char *name)
302 {
303         struct obd_device *result = NULL;
304         struct obd_device *newdev;
305         struct obd_type *type = NULL;
306         int i;
307         int new_obd_minor = 0;
308         ENTRY;
309
310         if (strlen(name) >= MAX_OBD_NAME) {
311                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
312                 RETURN(ERR_PTR(-EINVAL));
313         }
314
315         type = class_get_type(type_name);
316         if (type == NULL){
317                 CERROR("OBD: unknown type: %s\n", type_name);
318                 RETURN(ERR_PTR(-ENODEV));
319         }
320
321         newdev = obd_device_alloc();
322         if (newdev == NULL)
323                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
324
325         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
326
327         write_lock(&obd_dev_lock);
328         for (i = 0; i < class_devno_max(); i++) {
329                 struct obd_device *obd = class_num2obd(i);
330
331                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
332                         CERROR("Device %s already exists at %d, won't add\n",
333                                name, i);
334                         if (result) {
335                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
336                                          "%p obd_magic %08x != %08x\n", result,
337                                          result->obd_magic, OBD_DEVICE_MAGIC);
338                                 LASSERTF(result->obd_minor == new_obd_minor,
339                                          "%p obd_minor %d != %d\n", result,
340                                          result->obd_minor, new_obd_minor);
341
342                                 obd_devs[result->obd_minor] = NULL;
343                                 result->obd_name[0]='\0';
344                          }
345                         result = ERR_PTR(-EEXIST);
346                         break;
347                 }
348                 if (!result && !obd) {
349                         result = newdev;
350                         result->obd_minor = i;
351                         new_obd_minor = i;
352                         result->obd_type = type;
353                         strncpy(result->obd_name, name,
354                                 sizeof(result->obd_name) - 1);
355                         obd_devs[i] = result;
356                 }
357         }
358         write_unlock(&obd_dev_lock);
359
360         if (result == NULL && i >= class_devno_max()) {
361                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
362                        class_devno_max());
363                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
364         }
365
366         if (IS_ERR(result))
367                 GOTO(out, result);
368
369         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
370                result->obd_name, result);
371
372         RETURN(result);
373 out:
374         obd_device_free(newdev);
375 out_type:
376         class_put_type(type);
377         return result;
378 }
379
380 void class_release_dev(struct obd_device *obd)
381 {
382         struct obd_type *obd_type = obd->obd_type;
383
384         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
385                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
386         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
387                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
388         LASSERT(obd_type != NULL);
389
390         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
391                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
392
393         write_lock(&obd_dev_lock);
394         obd_devs[obd->obd_minor] = NULL;
395         write_unlock(&obd_dev_lock);
396         obd_device_free(obd);
397
398         class_put_type(obd_type);
399 }
400
401 int class_name2dev(const char *name)
402 {
403         int i;
404
405         if (!name)
406                 return -1;
407
408         read_lock(&obd_dev_lock);
409         for (i = 0; i < class_devno_max(); i++) {
410                 struct obd_device *obd = class_num2obd(i);
411
412                 if (obd && strcmp(name, obd->obd_name) == 0) {
413                         /* Make sure we finished attaching before we give
414                            out any references */
415                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
416                         if (obd->obd_attached) {
417                                 read_unlock(&obd_dev_lock);
418                                 return i;
419                         }
420                         break;
421                 }
422         }
423         read_unlock(&obd_dev_lock);
424
425         return -1;
426 }
427
428 struct obd_device *class_name2obd(const char *name)
429 {
430         int dev = class_name2dev(name);
431
432         if (dev < 0 || dev > class_devno_max())
433                 return NULL;
434         return class_num2obd(dev);
435 }
436 EXPORT_SYMBOL(class_name2obd);
437
438 int class_uuid2dev(struct obd_uuid *uuid)
439 {
440         int i;
441
442         read_lock(&obd_dev_lock);
443         for (i = 0; i < class_devno_max(); i++) {
444                 struct obd_device *obd = class_num2obd(i);
445
446                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
447                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
448                         read_unlock(&obd_dev_lock);
449                         return i;
450                 }
451         }
452         read_unlock(&obd_dev_lock);
453
454         return -1;
455 }
456
457 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
458 {
459         int dev = class_uuid2dev(uuid);
460         if (dev < 0)
461                 return NULL;
462         return class_num2obd(dev);
463 }
464 EXPORT_SYMBOL(class_uuid2obd);
465
466 /**
467  * Get obd device from ::obd_devs[]
468  *
469  * \param num [in] array index
470  *
471  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
472  *         otherwise return the obd device there.
473  */
474 struct obd_device *class_num2obd(int num)
475 {
476         struct obd_device *obd = NULL;
477
478         if (num < class_devno_max()) {
479                 obd = obd_devs[num];
480                 if (obd == NULL)
481                         return NULL;
482
483                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
484                          "%p obd_magic %08x != %08x\n",
485                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
486                 LASSERTF(obd->obd_minor == num,
487                          "%p obd_minor %0d != %0d\n",
488                          obd, obd->obd_minor, num);
489         }
490
491         return obd;
492 }
493
494 /**
495  * Get obd devices count. Device in any
496  *    state are counted
497  * \retval obd device count
498  */
499 int get_devices_count(void)
500 {
501         int index, max_index = class_devno_max(), dev_count = 0;
502
503         read_lock(&obd_dev_lock);
504         for (index = 0; index <= max_index; index++) {
505                 struct obd_device *obd = class_num2obd(index);
506                 if (obd != NULL)
507                         dev_count++;
508         }
509         read_unlock(&obd_dev_lock);
510
511         return dev_count;
512 }
513 EXPORT_SYMBOL(get_devices_count);
514
515 void class_obd_list(void)
516 {
517         char *status;
518         int i;
519
520         read_lock(&obd_dev_lock);
521         for (i = 0; i < class_devno_max(); i++) {
522                 struct obd_device *obd = class_num2obd(i);
523
524                 if (obd == NULL)
525                         continue;
526                 if (obd->obd_stopping)
527                         status = "ST";
528                 else if (obd->obd_set_up)
529                         status = "UP";
530                 else if (obd->obd_attached)
531                         status = "AT";
532                 else
533                         status = "--";
534                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
535                          i, status, obd->obd_type->typ_name,
536                          obd->obd_name, obd->obd_uuid.uuid,
537                          atomic_read(&obd->obd_refcount));
538         }
539         read_unlock(&obd_dev_lock);
540         return;
541 }
542
543 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
544    specified, then only the client with that uuid is returned,
545    otherwise any client connected to the tgt is returned. */
546 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
547                                           const char * typ_name,
548                                           struct obd_uuid *grp_uuid)
549 {
550         int i;
551
552         read_lock(&obd_dev_lock);
553         for (i = 0; i < class_devno_max(); i++) {
554                 struct obd_device *obd = class_num2obd(i);
555
556                 if (obd == NULL)
557                         continue;
558                 if ((strncmp(obd->obd_type->typ_name, typ_name,
559                              strlen(typ_name)) == 0)) {
560                         if (obd_uuid_equals(tgt_uuid,
561                                             &obd->u.cli.cl_target_uuid) &&
562                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
563                                                          &obd->obd_uuid) : 1)) {
564                                 read_unlock(&obd_dev_lock);
565                                 return obd;
566                         }
567                 }
568         }
569         read_unlock(&obd_dev_lock);
570
571         return NULL;
572 }
573 EXPORT_SYMBOL(class_find_client_obd);
574
575 /* Iterate the obd_device list looking devices have grp_uuid. Start
576    searching at *next, and if a device is found, the next index to look
577    at is saved in *next. If next is NULL, then the first matching device
578    will always be returned. */
579 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
580 {
581         int i;
582
583         if (next == NULL)
584                 i = 0;
585         else if (*next >= 0 && *next < class_devno_max())
586                 i = *next;
587         else
588                 return NULL;
589
590         read_lock(&obd_dev_lock);
591         for (; i < class_devno_max(); i++) {
592                 struct obd_device *obd = class_num2obd(i);
593
594                 if (obd == NULL)
595                         continue;
596                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
597                         if (next != NULL)
598                                 *next = i+1;
599                         read_unlock(&obd_dev_lock);
600                         return obd;
601                 }
602         }
603         read_unlock(&obd_dev_lock);
604
605         return NULL;
606 }
607 EXPORT_SYMBOL(class_devices_in_group);
608
609 /**
610  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
611  * adjust sptlrpc settings accordingly.
612  */
613 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
614 {
615         struct obd_device  *obd;
616         const char         *type;
617         int                 i, rc = 0, rc2;
618
619         LASSERT(namelen > 0);
620
621         read_lock(&obd_dev_lock);
622         for (i = 0; i < class_devno_max(); i++) {
623                 obd = class_num2obd(i);
624
625                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
626                         continue;
627
628                 /* only notify mdc, osc, mdt, ost */
629                 type = obd->obd_type->typ_name;
630                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
631                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
632                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
633                     strcmp(type, LUSTRE_OST_NAME) != 0)
634                         continue;
635
636                 if (strncmp(obd->obd_name, fsname, namelen))
637                         continue;
638
639                 class_incref(obd, __FUNCTION__, obd);
640                 read_unlock(&obd_dev_lock);
641                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
642                                          sizeof(KEY_SPTLRPC_CONF),
643                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
644                 rc = rc ? rc : rc2;
645                 class_decref(obd, __FUNCTION__, obd);
646                 read_lock(&obd_dev_lock);
647         }
648         read_unlock(&obd_dev_lock);
649         return rc;
650 }
651 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
652
653 void obd_cleanup_caches(void)
654 {
655         ENTRY;
656         if (obd_device_cachep) {
657                 kmem_cache_destroy(obd_device_cachep);
658                 obd_device_cachep = NULL;
659         }
660         if (obdo_cachep) {
661                 kmem_cache_destroy(obdo_cachep);
662                 obdo_cachep = NULL;
663         }
664         if (import_cachep) {
665                 kmem_cache_destroy(import_cachep);
666                 import_cachep = NULL;
667         }
668
669         EXIT;
670 }
671
672 int obd_init_caches(void)
673 {
674         int rc;
675         ENTRY;
676
677         LASSERT(obd_device_cachep == NULL);
678         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
679                                               sizeof(struct obd_device),
680                                               0, 0, NULL);
681         if (!obd_device_cachep)
682                 GOTO(out, rc = -ENOMEM);
683
684         LASSERT(obdo_cachep == NULL);
685         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
686                                         0, 0, NULL);
687         if (!obdo_cachep)
688                 GOTO(out, rc = -ENOMEM);
689
690         LASSERT(import_cachep == NULL);
691         import_cachep = kmem_cache_create("ll_import_cache",
692                                           sizeof(struct obd_import),
693                                           0, 0, NULL);
694         if (!import_cachep)
695                 GOTO(out, rc = -ENOMEM);
696
697         RETURN(0);
698 out:
699         obd_cleanup_caches();
700         RETURN(rc);
701 }
702
703 /* map connection to client */
704 struct obd_export *class_conn2export(struct lustre_handle *conn)
705 {
706         struct obd_export *export;
707         ENTRY;
708
709         if (!conn) {
710                 CDEBUG(D_CACHE, "looking for null handle\n");
711                 RETURN(NULL);
712         }
713
714         if (conn->cookie == -1) {  /* this means assign a new connection */
715                 CDEBUG(D_CACHE, "want a new connection\n");
716                 RETURN(NULL);
717         }
718
719         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
720         export = class_handle2object(conn->cookie, NULL);
721         RETURN(export);
722 }
723 EXPORT_SYMBOL(class_conn2export);
724
725 struct obd_device *class_exp2obd(struct obd_export *exp)
726 {
727         if (exp)
728                 return exp->exp_obd;
729         return NULL;
730 }
731 EXPORT_SYMBOL(class_exp2obd);
732
733 struct obd_device *class_conn2obd(struct lustre_handle *conn)
734 {
735         struct obd_export *export;
736         export = class_conn2export(conn);
737         if (export) {
738                 struct obd_device *obd = export->exp_obd;
739                 class_export_put(export);
740                 return obd;
741         }
742         return NULL;
743 }
744
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
746 {
747         struct obd_device *obd = exp->exp_obd;
748         if (obd == NULL)
749                 return NULL;
750         return obd->u.cli.cl_import;
751 }
752 EXPORT_SYMBOL(class_exp2cliimp);
753
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
755 {
756         struct obd_device *obd = class_conn2obd(conn);
757         if (obd == NULL)
758                 return NULL;
759         return obd->u.cli.cl_import;
760 }
761
762 /* Export management functions */
763 static void class_export_destroy(struct obd_export *exp)
764 {
765         struct obd_device *obd = exp->exp_obd;
766         ENTRY;
767
768         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
769         LASSERT(obd != NULL);
770
771         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
772                exp->exp_client_uuid.uuid, obd->obd_name);
773
774         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
775         if (exp->exp_connection)
776                 ptlrpc_put_connection_superhack(exp->exp_connection);
777
778         LASSERT(list_empty(&exp->exp_outstanding_replies));
779         LASSERT(list_empty(&exp->exp_uncommitted_replies));
780         LASSERT(list_empty(&exp->exp_req_replay_queue));
781         LASSERT(list_empty(&exp->exp_hp_rpcs));
782         obd_destroy_export(exp);
783         class_decref(obd, "export", exp);
784
785         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
786         EXIT;
787 }
788
789 static void export_handle_addref(void *export)
790 {
791         class_export_get(export);
792 }
793
794 static struct portals_handle_ops export_handle_ops = {
795         .hop_addref = export_handle_addref,
796         .hop_free   = NULL,
797 };
798
799 struct obd_export *class_export_get(struct obd_export *exp)
800 {
801         atomic_inc(&exp->exp_refcount);
802         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
803                atomic_read(&exp->exp_refcount));
804         return exp;
805 }
806 EXPORT_SYMBOL(class_export_get);
807
808 void class_export_put(struct obd_export *exp)
809 {
810         LASSERT(exp != NULL);
811         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
812         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
813                atomic_read(&exp->exp_refcount) - 1);
814
815         if (atomic_dec_and_test(&exp->exp_refcount)) {
816                 LASSERT(!list_empty(&exp->exp_obd_chain));
817                 LASSERT(list_empty(&exp->exp_stale_list));
818                 CDEBUG(D_IOCTL, "final put %p/%s\n",
819                        exp, exp->exp_client_uuid.uuid);
820
821                 /* release nid stat refererence */
822                 lprocfs_exp_cleanup(exp);
823
824                 obd_zombie_export_add(exp);
825         }
826 }
827 EXPORT_SYMBOL(class_export_put);
828
829 /* Creates a new export, adds it to the hash table, and returns a
830  * pointer to it. The refcount is 2: one for the hash reference, and
831  * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833                                     struct obd_uuid *cluuid)
834 {
835         struct obd_export *export;
836         struct cfs_hash *hash = NULL;
837         int rc = 0;
838         ENTRY;
839
840         OBD_ALLOC_PTR(export);
841         if (!export)
842                 return ERR_PTR(-ENOMEM);
843
844         export->exp_conn_cnt = 0;
845         export->exp_lock_hash = NULL;
846         export->exp_flock_hash = NULL;
847         atomic_set(&export->exp_refcount, 2);
848         atomic_set(&export->exp_rpc_count, 0);
849         atomic_set(&export->exp_cb_count, 0);
850         atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852         INIT_LIST_HEAD(&export->exp_locks_list);
853         spin_lock_init(&export->exp_locks_list_guard);
854 #endif
855         atomic_set(&export->exp_replay_count, 0);
856         export->exp_obd = obd;
857         INIT_LIST_HEAD(&export->exp_outstanding_replies);
858         spin_lock_init(&export->exp_uncommitted_replies_lock);
859         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860         INIT_LIST_HEAD(&export->exp_req_replay_queue);
861         INIT_LIST_HEAD(&export->exp_handle.h_link);
862         INIT_LIST_HEAD(&export->exp_hp_rpcs);
863         INIT_LIST_HEAD(&export->exp_reg_rpcs);
864         class_handle_hash(&export->exp_handle, &export_handle_ops);
865         export->exp_last_request_time = cfs_time_current_sec();
866         spin_lock_init(&export->exp_lock);
867         spin_lock_init(&export->exp_rpc_lock);
868         INIT_HLIST_NODE(&export->exp_uuid_hash);
869         INIT_HLIST_NODE(&export->exp_nid_hash);
870         INIT_HLIST_NODE(&export->exp_gen_hash);
871         spin_lock_init(&export->exp_bl_list_lock);
872         INIT_LIST_HEAD(&export->exp_bl_list);
873         INIT_LIST_HEAD(&export->exp_stale_list);
874
875         export->exp_sp_peer = LUSTRE_SP_ANY;
876         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
877         export->exp_client_uuid = *cluuid;
878         obd_init_export(export);
879
880         spin_lock(&obd->obd_dev_lock);
881         /* shouldn't happen, but might race */
882         if (obd->obd_stopping)
883                 GOTO(exit_unlock, rc = -ENODEV);
884
885         hash = cfs_hash_getref(obd->obd_uuid_hash);
886         if (hash == NULL)
887                 GOTO(exit_unlock, rc = -ENODEV);
888         spin_unlock(&obd->obd_dev_lock);
889
890         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
891                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
892                 if (rc != 0) {
893                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
894                                       obd->obd_name, cluuid->uuid, rc);
895                         GOTO(exit_err, rc = -EALREADY);
896                 }
897         }
898
899         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
900         spin_lock(&obd->obd_dev_lock);
901         if (obd->obd_stopping) {
902                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903                 GOTO(exit_unlock, rc = -ENODEV);
904         }
905
906         class_incref(obd, "export", export);
907         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908         list_add_tail(&export->exp_obd_chain_timed,
909                       &export->exp_obd->obd_exports_timed);
910         export->exp_obd->obd_num_exports++;
911         spin_unlock(&obd->obd_dev_lock);
912         cfs_hash_putref(hash);
913         RETURN(export);
914
915 exit_unlock:
916         spin_unlock(&obd->obd_dev_lock);
917 exit_err:
918         if (hash)
919                 cfs_hash_putref(hash);
920         class_handle_unhash(&export->exp_handle);
921         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
922         obd_destroy_export(export);
923         OBD_FREE_PTR(export);
924         return ERR_PTR(rc);
925 }
926 EXPORT_SYMBOL(class_new_export);
927
928 void class_unlink_export(struct obd_export *exp)
929 {
930         class_handle_unhash(&exp->exp_handle);
931
932         spin_lock(&exp->exp_obd->obd_dev_lock);
933         /* delete an uuid-export hashitem from hashtables */
934         if (!hlist_unhashed(&exp->exp_uuid_hash))
935                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936                              &exp->exp_client_uuid,
937                              &exp->exp_uuid_hash);
938
939         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
940         list_del_init(&exp->exp_obd_chain_timed);
941         exp->exp_obd->obd_num_exports--;
942         spin_unlock(&exp->exp_obd->obd_dev_lock);
943         atomic_inc(&obd_stale_export_num);
944
945         /* A reference is kept by obd_stale_exports list */
946         obd_stale_export_put(exp);
947 }
948
949 /* Import management functions */
950 static void class_import_destroy(struct obd_import *imp)
951 {
952         ENTRY;
953
954         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
955                 imp->imp_obd->obd_name);
956
957         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
958
959         ptlrpc_put_connection_superhack(imp->imp_connection);
960
961         while (!list_empty(&imp->imp_conn_list)) {
962                 struct obd_import_conn *imp_conn;
963
964                 imp_conn = list_entry(imp->imp_conn_list.next,
965                                       struct obd_import_conn, oic_item);
966                 list_del_init(&imp_conn->oic_item);
967                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
968                 OBD_FREE(imp_conn, sizeof(*imp_conn));
969         }
970
971         LASSERT(imp->imp_sec == NULL);
972         class_decref(imp->imp_obd, "import", imp);
973         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
974         EXIT;
975 }
976
977 static void import_handle_addref(void *import)
978 {
979         class_import_get(import);
980 }
981
982 static struct portals_handle_ops import_handle_ops = {
983         .hop_addref = import_handle_addref,
984         .hop_free   = NULL,
985 };
986
987 struct obd_import *class_import_get(struct obd_import *import)
988 {
989         atomic_inc(&import->imp_refcount);
990         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
991                atomic_read(&import->imp_refcount),
992                import->imp_obd->obd_name);
993         return import;
994 }
995 EXPORT_SYMBOL(class_import_get);
996
997 void class_import_put(struct obd_import *imp)
998 {
999         ENTRY;
1000
1001         LASSERT(list_empty(&imp->imp_zombie_chain));
1002         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1003
1004         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1005                atomic_read(&imp->imp_refcount) - 1,
1006                imp->imp_obd->obd_name);
1007
1008         if (atomic_dec_and_test(&imp->imp_refcount)) {
1009                 CDEBUG(D_INFO, "final put import %p\n", imp);
1010                 obd_zombie_import_add(imp);
1011         }
1012
1013         /* catch possible import put race */
1014         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1015         EXIT;
1016 }
1017 EXPORT_SYMBOL(class_import_put);
1018
1019 static void init_imp_at(struct imp_at *at) {
1020         int i;
1021         at_init(&at->iat_net_latency, 0, 0);
1022         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1023                 /* max service estimates are tracked on the server side, so
1024                    don't use the AT history here, just use the last reported
1025                    val. (But keep hist for proc histogram, worst_ever) */
1026                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1027                         AT_FLG_NOHIST);
1028         }
1029 }
1030
1031 struct obd_import *class_new_import(struct obd_device *obd)
1032 {
1033         struct obd_import *imp;
1034
1035         OBD_ALLOC(imp, sizeof(*imp));
1036         if (imp == NULL)
1037                 return NULL;
1038
1039         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1040         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1041         INIT_LIST_HEAD(&imp->imp_replay_list);
1042         INIT_LIST_HEAD(&imp->imp_sending_list);
1043         INIT_LIST_HEAD(&imp->imp_delayed_list);
1044         INIT_LIST_HEAD(&imp->imp_committed_list);
1045         imp->imp_replay_cursor = &imp->imp_committed_list;
1046         spin_lock_init(&imp->imp_lock);
1047         imp->imp_last_success_conn = 0;
1048         imp->imp_state = LUSTRE_IMP_NEW;
1049         imp->imp_obd = class_incref(obd, "import", imp);
1050         mutex_init(&imp->imp_sec_mutex);
1051         init_waitqueue_head(&imp->imp_recovery_waitq);
1052
1053         atomic_set(&imp->imp_refcount, 2);
1054         atomic_set(&imp->imp_unregistering, 0);
1055         atomic_set(&imp->imp_inflight, 0);
1056         atomic_set(&imp->imp_replay_inflight, 0);
1057         atomic_set(&imp->imp_inval_count, 0);
1058         INIT_LIST_HEAD(&imp->imp_conn_list);
1059         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1060         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1061         init_imp_at(&imp->imp_at);
1062
1063         /* the default magic is V2, will be used in connect RPC, and
1064          * then adjusted according to the flags in request/reply. */
1065         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1066
1067         return imp;
1068 }
1069 EXPORT_SYMBOL(class_new_import);
1070
1071 void class_destroy_import(struct obd_import *import)
1072 {
1073         LASSERT(import != NULL);
1074         LASSERT(import != LP_POISON);
1075
1076         class_handle_unhash(&import->imp_handle);
1077
1078         spin_lock(&import->imp_lock);
1079         import->imp_generation++;
1080         spin_unlock(&import->imp_lock);
1081         class_import_put(import);
1082 }
1083 EXPORT_SYMBOL(class_destroy_import);
1084
1085 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1086
1087 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1088 {
1089         spin_lock(&exp->exp_locks_list_guard);
1090
1091         LASSERT(lock->l_exp_refs_nr >= 0);
1092
1093         if (lock->l_exp_refs_target != NULL &&
1094             lock->l_exp_refs_target != exp) {
1095                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1096                               exp, lock, lock->l_exp_refs_target);
1097         }
1098         if ((lock->l_exp_refs_nr ++) == 0) {
1099                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1100                 lock->l_exp_refs_target = exp;
1101         }
1102         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1103                lock, exp, lock->l_exp_refs_nr);
1104         spin_unlock(&exp->exp_locks_list_guard);
1105 }
1106
1107 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1108 {
1109         spin_lock(&exp->exp_locks_list_guard);
1110         LASSERT(lock->l_exp_refs_nr > 0);
1111         if (lock->l_exp_refs_target != exp) {
1112                 LCONSOLE_WARN("lock %p, "
1113                               "mismatching export pointers: %p, %p\n",
1114                               lock, lock->l_exp_refs_target, exp);
1115         }
1116         if (-- lock->l_exp_refs_nr == 0) {
1117                 list_del_init(&lock->l_exp_refs_link);
1118                 lock->l_exp_refs_target = NULL;
1119         }
1120         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1121                lock, exp, lock->l_exp_refs_nr);
1122         spin_unlock(&exp->exp_locks_list_guard);
1123 }
1124 #endif
1125
1126 /* A connection defines an export context in which preallocation can
1127    be managed. This releases the export pointer reference, and returns
1128    the export handle, so the export refcount is 1 when this function
1129    returns. */
1130 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1131                   struct obd_uuid *cluuid)
1132 {
1133         struct obd_export *export;
1134         LASSERT(conn != NULL);
1135         LASSERT(obd != NULL);
1136         LASSERT(cluuid != NULL);
1137         ENTRY;
1138
1139         export = class_new_export(obd, cluuid);
1140         if (IS_ERR(export))
1141                 RETURN(PTR_ERR(export));
1142
1143         conn->cookie = export->exp_handle.h_cookie;
1144         class_export_put(export);
1145
1146         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1147                cluuid->uuid, conn->cookie);
1148         RETURN(0);
1149 }
1150 EXPORT_SYMBOL(class_connect);
1151
1152 /* if export is involved in recovery then clean up related things */
1153 static void class_export_recovery_cleanup(struct obd_export *exp)
1154 {
1155         struct obd_device *obd = exp->exp_obd;
1156
1157         spin_lock(&obd->obd_recovery_task_lock);
1158         if (obd->obd_recovering) {
1159                 if (exp->exp_in_recovery) {
1160                         spin_lock(&exp->exp_lock);
1161                         exp->exp_in_recovery = 0;
1162                         spin_unlock(&exp->exp_lock);
1163                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1164                         atomic_dec(&obd->obd_connected_clients);
1165                 }
1166
1167                 /* if called during recovery then should update
1168                  * obd_stale_clients counter,
1169                  * lightweight exports are not counted */
1170                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1171                         exp->exp_obd->obd_stale_clients++;
1172         }
1173         spin_unlock(&obd->obd_recovery_task_lock);
1174
1175         spin_lock(&exp->exp_lock);
1176         /** Cleanup req replay fields */
1177         if (exp->exp_req_replay_needed) {
1178                 exp->exp_req_replay_needed = 0;
1179
1180                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1181                 atomic_dec(&obd->obd_req_replay_clients);
1182         }
1183
1184         /** Cleanup lock replay data */
1185         if (exp->exp_lock_replay_needed) {
1186                 exp->exp_lock_replay_needed = 0;
1187
1188                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1189                 atomic_dec(&obd->obd_lock_replay_clients);
1190         }
1191         spin_unlock(&exp->exp_lock);
1192 }
1193
1194 /* This function removes 1-3 references from the export:
1195  * 1 - for export pointer passed
1196  * and if disconnect really need
1197  * 2 - removing from hash
1198  * 3 - in client_unlink_export
1199  * The export pointer passed to this function can destroyed */
1200 int class_disconnect(struct obd_export *export)
1201 {
1202         int already_disconnected;
1203         ENTRY;
1204
1205         if (export == NULL) {
1206                 CWARN("attempting to free NULL export %p\n", export);
1207                 RETURN(-EINVAL);
1208         }
1209
1210         spin_lock(&export->exp_lock);
1211         already_disconnected = export->exp_disconnected;
1212         export->exp_disconnected = 1;
1213         spin_unlock(&export->exp_lock);
1214
1215         /* class_cleanup(), abort_recovery(), and class_fail_export()
1216          * all end up in here, and if any of them race we shouldn't
1217          * call extra class_export_puts(). */
1218         if (already_disconnected) {
1219                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1220                 GOTO(no_disconn, already_disconnected);
1221         }
1222
1223         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1224                export->exp_handle.h_cookie);
1225
1226         if (!hlist_unhashed(&export->exp_nid_hash))
1227                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1228                              &export->exp_connection->c_peer.nid,
1229                              &export->exp_nid_hash);
1230
1231         class_export_recovery_cleanup(export);
1232         class_unlink_export(export);
1233 no_disconn:
1234         class_export_put(export);
1235         RETURN(0);
1236 }
1237 EXPORT_SYMBOL(class_disconnect);
1238
1239 /* Return non-zero for a fully connected export */
1240 int class_connected_export(struct obd_export *exp)
1241 {
1242         int connected = 0;
1243
1244         if (exp) {
1245                 spin_lock(&exp->exp_lock);
1246                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1247                 spin_unlock(&exp->exp_lock);
1248         }
1249         return connected;
1250 }
1251 EXPORT_SYMBOL(class_connected_export);
1252
1253 static void class_disconnect_export_list(struct list_head *list,
1254                                          enum obd_option flags)
1255 {
1256         int rc;
1257         struct obd_export *exp;
1258         ENTRY;
1259
1260         /* It's possible that an export may disconnect itself, but
1261          * nothing else will be added to this list. */
1262         while (!list_empty(list)) {
1263                 exp = list_entry(list->next, struct obd_export,
1264                                  exp_obd_chain);
1265                 /* need for safe call CDEBUG after obd_disconnect */
1266                 class_export_get(exp);
1267
1268                 spin_lock(&exp->exp_lock);
1269                 exp->exp_flags = flags;
1270                 spin_unlock(&exp->exp_lock);
1271
1272                 if (obd_uuid_equals(&exp->exp_client_uuid,
1273                                     &exp->exp_obd->obd_uuid)) {
1274                         CDEBUG(D_HA,
1275                                "exp %p export uuid == obd uuid, don't discon\n",
1276                                exp);
1277                         /* Need to delete this now so we don't end up pointing
1278                          * to work_list later when this export is cleaned up. */
1279                         list_del_init(&exp->exp_obd_chain);
1280                         class_export_put(exp);
1281                         continue;
1282                 }
1283
1284                 class_export_get(exp);
1285                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1286                        "last request at "CFS_TIME_T"\n",
1287                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1288                        exp, exp->exp_last_request_time);
1289                 /* release one export reference anyway */
1290                 rc = obd_disconnect(exp);
1291
1292                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1293                        obd_export_nid2str(exp), exp, rc);
1294                 class_export_put(exp);
1295         }
1296         EXIT;
1297 }
1298
1299 void class_disconnect_exports(struct obd_device *obd)
1300 {
1301         struct list_head work_list;
1302         ENTRY;
1303
1304         /* Move all of the exports from obd_exports to a work list, en masse. */
1305         INIT_LIST_HEAD(&work_list);
1306         spin_lock(&obd->obd_dev_lock);
1307         list_splice_init(&obd->obd_exports, &work_list);
1308         list_splice_init(&obd->obd_delayed_exports, &work_list);
1309         spin_unlock(&obd->obd_dev_lock);
1310
1311         if (!list_empty(&work_list)) {
1312                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1313                        "disconnecting them\n", obd->obd_minor, obd);
1314                 class_disconnect_export_list(&work_list,
1315                                              exp_flags_from_obd(obd));
1316         } else
1317                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1318                        obd->obd_minor, obd);
1319         EXIT;
1320 }
1321 EXPORT_SYMBOL(class_disconnect_exports);
1322
1323 /* Remove exports that have not completed recovery.
1324  */
1325 void class_disconnect_stale_exports(struct obd_device *obd,
1326                                     int (*test_export)(struct obd_export *))
1327 {
1328         struct list_head work_list;
1329         struct obd_export *exp, *n;
1330         int evicted = 0;
1331         ENTRY;
1332
1333         INIT_LIST_HEAD(&work_list);
1334         spin_lock(&obd->obd_dev_lock);
1335         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1336                                  exp_obd_chain) {
1337                 /* don't count self-export as client */
1338                 if (obd_uuid_equals(&exp->exp_client_uuid,
1339                                     &exp->exp_obd->obd_uuid))
1340                         continue;
1341
1342                 /* don't evict clients which have no slot in last_rcvd
1343                  * (e.g. lightweight connection) */
1344                 if (exp->exp_target_data.ted_lr_idx == -1)
1345                         continue;
1346
1347                 spin_lock(&exp->exp_lock);
1348                 if (exp->exp_failed || test_export(exp)) {
1349                         spin_unlock(&exp->exp_lock);
1350                         continue;
1351                 }
1352                 exp->exp_failed = 1;
1353                 spin_unlock(&exp->exp_lock);
1354
1355                 list_move(&exp->exp_obd_chain, &work_list);
1356                 evicted++;
1357                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1358                        obd->obd_name, exp->exp_client_uuid.uuid,
1359                        exp->exp_connection == NULL ? "<unknown>" :
1360                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1361                 print_export_data(exp, "EVICTING", 0);
1362         }
1363         spin_unlock(&obd->obd_dev_lock);
1364
1365         if (evicted)
1366                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1367                               obd->obd_name, evicted);
1368
1369         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1370                                                  OBD_OPT_ABORT_RECOV);
1371         EXIT;
1372 }
1373 EXPORT_SYMBOL(class_disconnect_stale_exports);
1374
1375 void class_fail_export(struct obd_export *exp)
1376 {
1377         int rc, already_failed;
1378
1379         spin_lock(&exp->exp_lock);
1380         already_failed = exp->exp_failed;
1381         exp->exp_failed = 1;
1382         spin_unlock(&exp->exp_lock);
1383
1384         if (already_failed) {
1385                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1386                        exp, exp->exp_client_uuid.uuid);
1387                 return;
1388         }
1389
1390         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1391                exp, exp->exp_client_uuid.uuid);
1392
1393         if (obd_dump_on_timeout)
1394                 libcfs_debug_dumplog();
1395
1396         /* need for safe call CDEBUG after obd_disconnect */
1397         class_export_get(exp);
1398
1399         /* Most callers into obd_disconnect are removing their own reference
1400          * (request, for example) in addition to the one from the hash table.
1401          * We don't have such a reference here, so make one. */
1402         class_export_get(exp);
1403         rc = obd_disconnect(exp);
1404         if (rc)
1405                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1406         else
1407                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1408                        exp, exp->exp_client_uuid.uuid);
1409         class_export_put(exp);
1410 }
1411 EXPORT_SYMBOL(class_fail_export);
1412
1413 char *obd_export_nid2str(struct obd_export *exp)
1414 {
1415         if (exp->exp_connection != NULL)
1416                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1417
1418         return "(no nid)";
1419 }
1420 EXPORT_SYMBOL(obd_export_nid2str);
1421
1422 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1423 {
1424         struct cfs_hash *nid_hash;
1425         struct obd_export *doomed_exp = NULL;
1426         int exports_evicted = 0;
1427
1428         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1429
1430         spin_lock(&obd->obd_dev_lock);
1431         /* umount has run already, so evict thread should leave
1432          * its task to umount thread now */
1433         if (obd->obd_stopping) {
1434                 spin_unlock(&obd->obd_dev_lock);
1435                 return exports_evicted;
1436         }
1437         nid_hash = obd->obd_nid_hash;
1438         cfs_hash_getref(nid_hash);
1439         spin_unlock(&obd->obd_dev_lock);
1440
1441         do {
1442                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1443                 if (doomed_exp == NULL)
1444                         break;
1445
1446                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1447                          "nid %s found, wanted nid %s, requested nid %s\n",
1448                          obd_export_nid2str(doomed_exp),
1449                          libcfs_nid2str(nid_key), nid);
1450                 LASSERTF(doomed_exp != obd->obd_self_export,
1451                          "self-export is hashed by NID?\n");
1452                 exports_evicted++;
1453                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1454                               "request\n", obd->obd_name,
1455                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1456                               obd_export_nid2str(doomed_exp));
1457                 class_fail_export(doomed_exp);
1458                 class_export_put(doomed_exp);
1459         } while (1);
1460
1461         cfs_hash_putref(nid_hash);
1462
1463         if (!exports_evicted)
1464                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1465                        obd->obd_name, nid);
1466         return exports_evicted;
1467 }
1468 EXPORT_SYMBOL(obd_export_evict_by_nid);
1469
1470 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1471 {
1472         struct cfs_hash *uuid_hash;
1473         struct obd_export *doomed_exp = NULL;
1474         struct obd_uuid doomed_uuid;
1475         int exports_evicted = 0;
1476
1477         spin_lock(&obd->obd_dev_lock);
1478         if (obd->obd_stopping) {
1479                 spin_unlock(&obd->obd_dev_lock);
1480                 return exports_evicted;
1481         }
1482         uuid_hash = obd->obd_uuid_hash;
1483         cfs_hash_getref(uuid_hash);
1484         spin_unlock(&obd->obd_dev_lock);
1485
1486         obd_str2uuid(&doomed_uuid, uuid);
1487         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1488                 CERROR("%s: can't evict myself\n", obd->obd_name);
1489                 cfs_hash_putref(uuid_hash);
1490                 return exports_evicted;
1491         }
1492
1493         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1494
1495         if (doomed_exp == NULL) {
1496                 CERROR("%s: can't disconnect %s: no exports found\n",
1497                        obd->obd_name, uuid);
1498         } else {
1499                 CWARN("%s: evicting %s at adminstrative request\n",
1500                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1501                 class_fail_export(doomed_exp);
1502                 class_export_put(doomed_exp);
1503                 exports_evicted++;
1504         }
1505         cfs_hash_putref(uuid_hash);
1506
1507         return exports_evicted;
1508 }
1509
1510 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1511 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1512 #endif
1513
1514 static void print_export_data(struct obd_export *exp, const char *status,
1515                               int locks)
1516 {
1517         struct ptlrpc_reply_state *rs;
1518         struct ptlrpc_reply_state *first_reply = NULL;
1519         int nreplies = 0;
1520
1521         spin_lock(&exp->exp_lock);
1522         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1523                             rs_exp_list) {
1524                 if (nreplies == 0)
1525                         first_reply = rs;
1526                 nreplies++;
1527         }
1528         spin_unlock(&exp->exp_lock);
1529
1530         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1531                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1532                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1533                atomic_read(&exp->exp_rpc_count),
1534                atomic_read(&exp->exp_cb_count),
1535                atomic_read(&exp->exp_locks_count),
1536                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1537                nreplies, first_reply, nreplies > 3 ? "..." : "",
1538                exp->exp_last_committed);
1539 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1540         if (locks && class_export_dump_hook != NULL)
1541                 class_export_dump_hook(exp);
1542 #endif
1543 }
1544
1545 void dump_exports(struct obd_device *obd, int locks)
1546 {
1547         struct obd_export *exp;
1548
1549         spin_lock(&obd->obd_dev_lock);
1550         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1551                 print_export_data(exp, "ACTIVE", locks);
1552         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1553                 print_export_data(exp, "UNLINKED", locks);
1554         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1555                 print_export_data(exp, "DELAYED", locks);
1556         spin_unlock(&obd->obd_dev_lock);
1557         spin_lock(&obd_zombie_impexp_lock);
1558         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1559                 print_export_data(exp, "ZOMBIE", locks);
1560         spin_unlock(&obd_zombie_impexp_lock);
1561 }
1562
1563 void obd_exports_barrier(struct obd_device *obd)
1564 {
1565         int waited = 2;
1566         LASSERT(list_empty(&obd->obd_exports));
1567         spin_lock(&obd->obd_dev_lock);
1568         while (!list_empty(&obd->obd_unlinked_exports)) {
1569                 spin_unlock(&obd->obd_dev_lock);
1570                 set_current_state(TASK_UNINTERRUPTIBLE);
1571                 schedule_timeout(cfs_time_seconds(waited));
1572                 if (waited > 5 && IS_PO2(waited)) {
1573                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1574                                       "more than %d seconds. "
1575                                       "The obd refcount = %d. Is it stuck?\n",
1576                                       obd->obd_name, waited,
1577                                       atomic_read(&obd->obd_refcount));
1578                         dump_exports(obd, 1);
1579                 }
1580                 waited *= 2;
1581                 spin_lock(&obd->obd_dev_lock);
1582         }
1583         spin_unlock(&obd->obd_dev_lock);
1584 }
1585 EXPORT_SYMBOL(obd_exports_barrier);
1586
1587 /* Total amount of zombies to be destroyed */
1588 static int zombies_count = 0;
1589
1590 /**
1591  * kill zombie imports and exports
1592  */
1593 void obd_zombie_impexp_cull(void)
1594 {
1595         struct obd_import *import;
1596         struct obd_export *export;
1597         ENTRY;
1598
1599         do {
1600                 spin_lock(&obd_zombie_impexp_lock);
1601
1602                 import = NULL;
1603                 if (!list_empty(&obd_zombie_imports)) {
1604                         import = list_entry(obd_zombie_imports.next,
1605                                             struct obd_import,
1606                                             imp_zombie_chain);
1607                         list_del_init(&import->imp_zombie_chain);
1608                 }
1609
1610                 export = NULL;
1611                 if (!list_empty(&obd_zombie_exports)) {
1612                         export = list_entry(obd_zombie_exports.next,
1613                                             struct obd_export,
1614                                             exp_obd_chain);
1615                         list_del_init(&export->exp_obd_chain);
1616                 }
1617
1618                 spin_unlock(&obd_zombie_impexp_lock);
1619
1620                 if (import != NULL) {
1621                         class_import_destroy(import);
1622                         spin_lock(&obd_zombie_impexp_lock);
1623                         zombies_count--;
1624                         spin_unlock(&obd_zombie_impexp_lock);
1625                 }
1626
1627                 if (export != NULL) {
1628                         class_export_destroy(export);
1629                         spin_lock(&obd_zombie_impexp_lock);
1630                         zombies_count--;
1631                         spin_unlock(&obd_zombie_impexp_lock);
1632                 }
1633
1634                 cond_resched();
1635         } while (import != NULL || export != NULL);
1636         EXIT;
1637 }
1638
1639 static struct completion        obd_zombie_start;
1640 static struct completion        obd_zombie_stop;
1641 static unsigned long            obd_zombie_flags;
1642 static wait_queue_head_t        obd_zombie_waitq;
1643 static pid_t                    obd_zombie_pid;
1644
1645 enum {
1646         OBD_ZOMBIE_STOP         = 0x0001,
1647 };
1648
1649 /**
1650  * check for work for kill zombie import/export thread.
1651  */
1652 static int obd_zombie_impexp_check(void *arg)
1653 {
1654         int rc;
1655
1656         spin_lock(&obd_zombie_impexp_lock);
1657         rc = (zombies_count == 0) &&
1658              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1659         spin_unlock(&obd_zombie_impexp_lock);
1660
1661         RETURN(rc);
1662 }
1663
1664 /**
1665  * Add export to the obd_zombe thread and notify it.
1666  */
1667 static void obd_zombie_export_add(struct obd_export *exp) {
1668         atomic_dec(&obd_stale_export_num);
1669         spin_lock(&exp->exp_obd->obd_dev_lock);
1670         LASSERT(!list_empty(&exp->exp_obd_chain));
1671         list_del_init(&exp->exp_obd_chain);
1672         spin_unlock(&exp->exp_obd->obd_dev_lock);
1673         spin_lock(&obd_zombie_impexp_lock);
1674         zombies_count++;
1675         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1676         spin_unlock(&obd_zombie_impexp_lock);
1677
1678         obd_zombie_impexp_notify();
1679 }
1680
1681 /**
1682  * Add import to the obd_zombe thread and notify it.
1683  */
1684 static void obd_zombie_import_add(struct obd_import *imp) {
1685         LASSERT(imp->imp_sec == NULL);
1686         spin_lock(&obd_zombie_impexp_lock);
1687         LASSERT(list_empty(&imp->imp_zombie_chain));
1688         zombies_count++;
1689         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1690         spin_unlock(&obd_zombie_impexp_lock);
1691
1692         obd_zombie_impexp_notify();
1693 }
1694
1695 /**
1696  * notify import/export destroy thread about new zombie.
1697  */
1698 static void obd_zombie_impexp_notify(void)
1699 {
1700         /*
1701          * Make sure obd_zomebie_impexp_thread get this notification.
1702          * It is possible this signal only get by obd_zombie_barrier, and
1703          * barrier gulps this notification and sleeps away and hangs ensues
1704          */
1705         wake_up_all(&obd_zombie_waitq);
1706 }
1707
1708 /**
1709  * check whether obd_zombie is idle
1710  */
1711 static int obd_zombie_is_idle(void)
1712 {
1713         int rc;
1714
1715         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1716         spin_lock(&obd_zombie_impexp_lock);
1717         rc = (zombies_count == 0);
1718         spin_unlock(&obd_zombie_impexp_lock);
1719         return rc;
1720 }
1721
1722 /**
1723  * wait when obd_zombie import/export queues become empty
1724  */
1725 void obd_zombie_barrier(void)
1726 {
1727         struct l_wait_info lwi = { 0 };
1728
1729         if (obd_zombie_pid == current_pid())
1730                 /* don't wait for myself */
1731                 return;
1732         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1733 }
1734 EXPORT_SYMBOL(obd_zombie_barrier);
1735
1736
1737 struct obd_export *obd_stale_export_get(void)
1738 {
1739         struct obd_export *exp = NULL;
1740         ENTRY;
1741
1742         spin_lock(&obd_stale_export_lock);
1743         if (!list_empty(&obd_stale_exports)) {
1744                 exp = list_entry(obd_stale_exports.next,
1745                                  struct obd_export, exp_stale_list);
1746                 list_del_init(&exp->exp_stale_list);
1747         }
1748         spin_unlock(&obd_stale_export_lock);
1749
1750         if (exp) {
1751                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1752                        atomic_read(&obd_stale_export_num));
1753         }
1754         RETURN(exp);
1755 }
1756 EXPORT_SYMBOL(obd_stale_export_get);
1757
1758 void obd_stale_export_put(struct obd_export *exp)
1759 {
1760         ENTRY;
1761
1762         LASSERT(list_empty(&exp->exp_stale_list));
1763         if (exp->exp_lock_hash &&
1764             atomic_read(&exp->exp_lock_hash->hs_count)) {
1765                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1766                        atomic_read(&obd_stale_export_num));
1767
1768                 spin_lock_bh(&exp->exp_bl_list_lock);
1769                 spin_lock(&obd_stale_export_lock);
1770                 /* Add to the tail if there is no blocked locks,
1771                  * to the head otherwise. */
1772                 if (list_empty(&exp->exp_bl_list))
1773                         list_add_tail(&exp->exp_stale_list,
1774                                       &obd_stale_exports);
1775                 else
1776                         list_add(&exp->exp_stale_list,
1777                                  &obd_stale_exports);
1778
1779                 spin_unlock(&obd_stale_export_lock);
1780                 spin_unlock_bh(&exp->exp_bl_list_lock);
1781         } else {
1782                 class_export_put(exp);
1783         }
1784         EXIT;
1785 }
1786 EXPORT_SYMBOL(obd_stale_export_put);
1787
1788 /**
1789  * Adjust the position of the export in the stale list,
1790  * i.e. move to the head of the list if is needed.
1791  **/
1792 void obd_stale_export_adjust(struct obd_export *exp)
1793 {
1794         LASSERT(exp != NULL);
1795         spin_lock_bh(&exp->exp_bl_list_lock);
1796         spin_lock(&obd_stale_export_lock);
1797
1798         if (!list_empty(&exp->exp_stale_list) &&
1799             !list_empty(&exp->exp_bl_list))
1800                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1801
1802         spin_unlock(&obd_stale_export_lock);
1803         spin_unlock_bh(&exp->exp_bl_list_lock);
1804 }
1805 EXPORT_SYMBOL(obd_stale_export_adjust);
1806
1807 /**
1808  * destroy zombie export/import thread.
1809  */
1810 static int obd_zombie_impexp_thread(void *unused)
1811 {
1812         unshare_fs_struct();
1813         complete(&obd_zombie_start);
1814
1815         obd_zombie_pid = current_pid();
1816
1817         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1818                 struct l_wait_info lwi = { 0 };
1819
1820                 l_wait_event(obd_zombie_waitq,
1821                              !obd_zombie_impexp_check(NULL), &lwi);
1822                 obd_zombie_impexp_cull();
1823
1824                 /*
1825                  * Notify obd_zombie_barrier callers that queues
1826                  * may be empty.
1827                  */
1828                 wake_up(&obd_zombie_waitq);
1829         }
1830
1831         complete(&obd_zombie_stop);
1832
1833         RETURN(0);
1834 }
1835
1836
1837 /**
1838  * start destroy zombie import/export thread
1839  */
1840 int obd_zombie_impexp_init(void)
1841 {
1842         struct task_struct *task;
1843
1844         INIT_LIST_HEAD(&obd_zombie_imports);
1845
1846         INIT_LIST_HEAD(&obd_zombie_exports);
1847         spin_lock_init(&obd_zombie_impexp_lock);
1848         init_completion(&obd_zombie_start);
1849         init_completion(&obd_zombie_stop);
1850         init_waitqueue_head(&obd_zombie_waitq);
1851         obd_zombie_pid = 0;
1852
1853         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1854         if (IS_ERR(task))
1855                 RETURN(PTR_ERR(task));
1856
1857         wait_for_completion(&obd_zombie_start);
1858         RETURN(0);
1859 }
1860 /**
1861  * stop destroy zombie import/export thread
1862  */
1863 void obd_zombie_impexp_stop(void)
1864 {
1865         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1866         obd_zombie_impexp_notify();
1867         wait_for_completion(&obd_zombie_stop);
1868 }
1869
1870 /***** Kernel-userspace comm helpers *******/
1871
1872 /* Get length of entire message, including header */
1873 int kuc_len(int payload_len)
1874 {
1875         return sizeof(struct kuc_hdr) + payload_len;
1876 }
1877 EXPORT_SYMBOL(kuc_len);
1878
1879 /* Get a pointer to kuc header, given a ptr to the payload
1880  * @param p Pointer to payload area
1881  * @returns Pointer to kuc header
1882  */
1883 struct kuc_hdr * kuc_ptr(void *p)
1884 {
1885         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1886         LASSERT(lh->kuc_magic == KUC_MAGIC);
1887         return lh;
1888 }
1889 EXPORT_SYMBOL(kuc_ptr);
1890
1891 /* Test if payload is part of kuc message
1892  * @param p Pointer to payload area
1893  * @returns boolean
1894  */
1895 int kuc_ispayload(void *p)
1896 {
1897         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1898
1899         if (kh->kuc_magic == KUC_MAGIC)
1900                 return 1;
1901         else
1902                 return 0;
1903 }
1904 EXPORT_SYMBOL(kuc_ispayload);
1905
1906 /* Alloc space for a message, and fill in header
1907  * @return Pointer to payload area
1908  */
1909 void *kuc_alloc(int payload_len, int transport, int type)
1910 {
1911         struct kuc_hdr *lh;
1912         int len = kuc_len(payload_len);
1913
1914         OBD_ALLOC(lh, len);
1915         if (lh == NULL)
1916                 return ERR_PTR(-ENOMEM);
1917
1918         lh->kuc_magic = KUC_MAGIC;
1919         lh->kuc_transport = transport;
1920         lh->kuc_msgtype = type;
1921         lh->kuc_msglen = len;
1922
1923         return (void *)(lh + 1);
1924 }
1925 EXPORT_SYMBOL(kuc_alloc);
1926
1927 /* Takes pointer to payload area */
1928 inline void kuc_free(void *p, int payload_len)
1929 {
1930         struct kuc_hdr *lh = kuc_ptr(p);
1931         OBD_FREE(lh, kuc_len(payload_len));
1932 }
1933 EXPORT_SYMBOL(kuc_free);
1934
1935 struct obd_request_slot_waiter {
1936         struct list_head        orsw_entry;
1937         wait_queue_head_t       orsw_waitq;
1938         bool                    orsw_signaled;
1939 };
1940
1941 static bool obd_request_slot_avail(struct client_obd *cli,
1942                                    struct obd_request_slot_waiter *orsw)
1943 {
1944         bool avail;
1945
1946         spin_lock(&cli->cl_loi_list_lock);
1947         avail = !!list_empty(&orsw->orsw_entry);
1948         spin_unlock(&cli->cl_loi_list_lock);
1949
1950         return avail;
1951 };
1952
1953 /*
1954  * For network flow control, the RPC sponsor needs to acquire a credit
1955  * before sending the RPC. The credits count for a connection is defined
1956  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1957  * the subsequent RPC sponsors need to wait until others released their
1958  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1959  */
1960 int obd_get_request_slot(struct client_obd *cli)
1961 {
1962         struct obd_request_slot_waiter   orsw;
1963         struct l_wait_info               lwi;
1964         int                              rc;
1965
1966         spin_lock(&cli->cl_loi_list_lock);
1967         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1968                 cli->cl_r_in_flight++;
1969                 spin_unlock(&cli->cl_loi_list_lock);
1970                 return 0;
1971         }
1972
1973         init_waitqueue_head(&orsw.orsw_waitq);
1974         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1975         orsw.orsw_signaled = false;
1976         spin_unlock(&cli->cl_loi_list_lock);
1977
1978         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1979         rc = l_wait_event(orsw.orsw_waitq,
1980                           obd_request_slot_avail(cli, &orsw) ||
1981                           orsw.orsw_signaled,
1982                           &lwi);
1983
1984         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1985          * freed but other (such as obd_put_request_slot) is using it. */
1986         spin_lock(&cli->cl_loi_list_lock);
1987         if (rc != 0) {
1988                 if (!orsw.orsw_signaled) {
1989                         if (list_empty(&orsw.orsw_entry))
1990                                 cli->cl_r_in_flight--;
1991                         else
1992                                 list_del(&orsw.orsw_entry);
1993                 }
1994         }
1995
1996         if (orsw.orsw_signaled) {
1997                 LASSERT(list_empty(&orsw.orsw_entry));
1998
1999                 rc = -EINTR;
2000         }
2001         spin_unlock(&cli->cl_loi_list_lock);
2002
2003         return rc;
2004 }
2005 EXPORT_SYMBOL(obd_get_request_slot);
2006
2007 void obd_put_request_slot(struct client_obd *cli)
2008 {
2009         struct obd_request_slot_waiter *orsw;
2010
2011         spin_lock(&cli->cl_loi_list_lock);
2012         cli->cl_r_in_flight--;
2013
2014         /* If there is free slot, wakeup the first waiter. */
2015         if (!list_empty(&cli->cl_loi_read_list) &&
2016             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2017                 orsw = list_entry(cli->cl_loi_read_list.next,
2018                                   struct obd_request_slot_waiter, orsw_entry);
2019                 list_del_init(&orsw->orsw_entry);
2020                 cli->cl_r_in_flight++;
2021                 wake_up(&orsw->orsw_waitq);
2022         }
2023         spin_unlock(&cli->cl_loi_list_lock);
2024 }
2025 EXPORT_SYMBOL(obd_put_request_slot);
2026
2027 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2028 {
2029         return cli->cl_max_rpcs_in_flight;
2030 }
2031 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2032
2033 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2034 {
2035         struct obd_request_slot_waiter *orsw;
2036         __u32                           old;
2037         int                             diff;
2038         int                             i;
2039         char                            *typ_name;
2040         int                             rc;
2041
2042         if (max > OBD_MAX_RIF_MAX || max < 1)
2043                 return -ERANGE;
2044
2045         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2046         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2047                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2048                  * strictly lower that max_rpcs_in_flight */
2049                 if (max < 2) {
2050                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2051                                "because it must be higher than "
2052                                "max_mod_rpcs_in_flight value",
2053                                cli->cl_import->imp_obd->obd_name);
2054                         return -ERANGE;
2055                 }
2056                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2057                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2058                         if (rc != 0)
2059                                 return rc;
2060                 }
2061         }
2062
2063         spin_lock(&cli->cl_loi_list_lock);
2064         old = cli->cl_max_rpcs_in_flight;
2065         cli->cl_max_rpcs_in_flight = max;
2066         diff = max - old;
2067
2068         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2069         for (i = 0; i < diff; i++) {
2070                 if (list_empty(&cli->cl_loi_read_list))
2071                         break;
2072
2073                 orsw = list_entry(cli->cl_loi_read_list.next,
2074                                   struct obd_request_slot_waiter, orsw_entry);
2075                 list_del_init(&orsw->orsw_entry);
2076                 cli->cl_r_in_flight++;
2077                 wake_up(&orsw->orsw_waitq);
2078         }
2079         spin_unlock(&cli->cl_loi_list_lock);
2080
2081         return 0;
2082 }
2083 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2084
2085 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2086 {
2087         return cli->cl_max_mod_rpcs_in_flight;
2088 }
2089 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2090
2091 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2092 {
2093         struct obd_connect_data *ocd;
2094         __u16 maxmodrpcs;
2095         __u16 prev;
2096
2097         if (max > OBD_MAX_RIF_MAX || max < 1)
2098                 return -ERANGE;
2099
2100         /* cannot exceed or equal max_rpcs_in_flight */
2101         if (max >= cli->cl_max_rpcs_in_flight) {
2102                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2103                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2104                        cli->cl_import->imp_obd->obd_name,
2105                        max, cli->cl_max_rpcs_in_flight);
2106                 return -ERANGE;
2107         }
2108
2109         /* cannot exceed max modify RPCs in flight supported by the server */
2110         ocd = &cli->cl_import->imp_connect_data;
2111         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2112                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2113         else
2114                 maxmodrpcs = 1;
2115         if (max > maxmodrpcs) {
2116                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2117                        "higher than max_mod_rpcs_per_client value (%hu) "
2118                        "returned by the server at connection\n",
2119                        cli->cl_import->imp_obd->obd_name,
2120                        max, maxmodrpcs);
2121                 return -ERANGE;
2122         }
2123
2124         spin_lock(&cli->cl_mod_rpcs_lock);
2125
2126         prev = cli->cl_max_mod_rpcs_in_flight;
2127         cli->cl_max_mod_rpcs_in_flight = max;
2128
2129         /* wakeup waiters if limit has been increased */
2130         if (cli->cl_max_mod_rpcs_in_flight > prev)
2131                 wake_up(&cli->cl_mod_rpcs_waitq);
2132
2133         spin_unlock(&cli->cl_mod_rpcs_lock);
2134
2135         return 0;
2136 }
2137 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2138
2139
2140 #define pct(a, b) (b ? a * 100 / b : 0)
2141 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2142                                struct seq_file *seq)
2143 {
2144         struct timeval now;
2145         unsigned long mod_tot = 0, mod_cum;
2146         int i;
2147
2148         do_gettimeofday(&now);
2149
2150         spin_lock(&cli->cl_mod_rpcs_lock);
2151
2152         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2153                    now.tv_sec, now.tv_usec);
2154         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2155                    cli->cl_mod_rpcs_in_flight);
2156
2157         seq_printf(seq, "\n\t\t\tmodify\n");
2158         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2159
2160         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2161
2162         mod_cum = 0;
2163         for (i = 0; i < OBD_HIST_MAX; i++) {
2164                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2165                 mod_cum += mod;
2166                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2167                                  i, mod, pct(mod, mod_tot),
2168                                  pct(mod_cum, mod_tot));
2169                 if (mod_cum == mod_tot)
2170                         break;
2171         }
2172
2173         spin_unlock(&cli->cl_mod_rpcs_lock);
2174
2175         return 0;
2176 }
2177 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2178 #undef pct
2179
2180
2181 /* The number of modify RPCs sent in parallel is limited
2182  * because the server has a finite number of slots per client to
2183  * store request result and ensure reply reconstruction when needed.
2184  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2185  * that takes into account server limit and cl_max_rpcs_in_flight
2186  * value.
2187  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2188  * one close request is allowed above the maximum.
2189  */
2190 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2191                                                  bool close_req)
2192 {
2193         bool avail;
2194
2195         /* A slot is available if
2196          * - number of modify RPCs in flight is less than the max
2197          * - it's a close RPC and no other close request is in flight
2198          */
2199         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2200                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2201
2202         return avail;
2203 }
2204
2205 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2206                                          bool close_req)
2207 {
2208         bool avail;
2209
2210         spin_lock(&cli->cl_mod_rpcs_lock);
2211         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2212         spin_unlock(&cli->cl_mod_rpcs_lock);
2213         return avail;
2214 }
2215
2216 /* Get a modify RPC slot from the obd client @cli according
2217  * to the kind of operation @opc that is going to be sent
2218  * and the intent @it of the operation if it applies.
2219  * If the maximum number of modify RPCs in flight is reached
2220  * the thread is put to sleep.
2221  * Returns the tag to be set in the request message. Tag 0
2222  * is reserved for non-modifying requests.
2223  */
2224 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2225                            struct lookup_intent *it)
2226 {
2227         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2228         bool                    close_req = false;
2229         __u16                   i, max;
2230
2231         /* read-only metadata RPCs don't consume a slot on MDT
2232          * for reply reconstruction
2233          */
2234         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2235                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2236                 return 0;
2237
2238         if (opc == MDS_CLOSE)
2239                 close_req = true;
2240
2241         do {
2242                 spin_lock(&cli->cl_mod_rpcs_lock);
2243                 max = cli->cl_max_mod_rpcs_in_flight;
2244                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2245                         /* there is a slot available */
2246                         cli->cl_mod_rpcs_in_flight++;
2247                         if (close_req)
2248                                 cli->cl_close_rpcs_in_flight++;
2249                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2250                                          cli->cl_mod_rpcs_in_flight);
2251                         /* find a free tag */
2252                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2253                                                 max + 1);
2254                         LASSERT(i < OBD_MAX_RIF_MAX);
2255                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2256                         spin_unlock(&cli->cl_mod_rpcs_lock);
2257                         /* tag 0 is reserved for non-modify RPCs */
2258                         return i + 1;
2259                 }
2260                 spin_unlock(&cli->cl_mod_rpcs_lock);
2261
2262                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2263                        "opc %u, max %hu\n",
2264                        cli->cl_import->imp_obd->obd_name, opc, max);
2265
2266                 l_wait_event(cli->cl_mod_rpcs_waitq,
2267                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2268         } while (true);
2269 }
2270 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2271
2272 /* Put a modify RPC slot from the obd client @cli according
2273  * to the kind of operation @opc that has been sent and the
2274  * intent @it of the operation if it applies.
2275  */
2276 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2277                           struct lookup_intent *it, __u16 tag)
2278 {
2279         bool                    close_req = false;
2280
2281         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2282                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2283                 return;
2284
2285         if (opc == MDS_CLOSE)
2286                 close_req = true;
2287
2288         spin_lock(&cli->cl_mod_rpcs_lock);
2289         cli->cl_mod_rpcs_in_flight--;
2290         if (close_req)
2291                 cli->cl_close_rpcs_in_flight--;
2292         /* release the tag in the bitmap */
2293         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2294         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2295         spin_unlock(&cli->cl_mod_rpcs_lock);
2296         wake_up(&cli->cl_mod_rpcs_waitq);
2297 }
2298 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2299