Whamcloud - gitweb
LU-327 cleanup the client import of mgc
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * Copyright (c) 2011 Whamcloud, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 #ifndef EXPORT_SYMTAB
41 # define EXPORT_SYMTAB
42 #endif
43 #define DEBUG_SUBSYSTEM S_OSC
44
45 #include <libcfs/libcfs.h>
46
47 #ifndef __KERNEL__
48 # include <liblustre.h>
49 #endif
50
51 #include <lustre_dlm.h>
52 #include <lustre_net.h>
53 #include <lustre/lustre_user.h>
54 #include <obd_cksum.h>
55 #include <obd_ost.h>
56 #include <obd_lov.h>
57
58 #ifdef  __CYGWIN__
59 # include <ctype.h>
60 #endif
61
62 #include <lustre_ha.h>
63 #include <lprocfs_status.h>
64 #include <lustre_log.h>
65 #include <lustre_debug.h>
66 #include <lustre_param.h>
67 #include "osc_internal.h"
68
69 static quota_interface_t *quota_interface = NULL;
70 extern quota_interface_t osc_quota_interface;
71
72 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
73 static int brw_interpret(const struct lu_env *env,
74                          struct ptlrpc_request *req, void *data, int rc);
75 int osc_cleanup(struct obd_device *obd);
76
77 /* Pack OSC object metadata for disk storage (LE byte order). */
78 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
79                       struct lov_stripe_md *lsm)
80 {
81         int lmm_size;
82         ENTRY;
83
84         lmm_size = sizeof(**lmmp);
85         if (!lmmp)
86                 RETURN(lmm_size);
87
88         if (*lmmp && !lsm) {
89                 OBD_FREE(*lmmp, lmm_size);
90                 *lmmp = NULL;
91                 RETURN(0);
92         }
93
94         if (!*lmmp) {
95                 OBD_ALLOC(*lmmp, lmm_size);
96                 if (!*lmmp)
97                         RETURN(-ENOMEM);
98         }
99
100         if (lsm) {
101                 LASSERT(lsm->lsm_object_id);
102                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
103                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
104                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
105         }
106
107         RETURN(lmm_size);
108 }
109
110 /* Unpack OSC object metadata from disk storage (LE byte order). */
111 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
112                         struct lov_mds_md *lmm, int lmm_bytes)
113 {
114         int lsm_size;
115         struct obd_import *imp = class_exp2cliimp(exp);
116         ENTRY;
117
118         if (lmm != NULL) {
119                 if (lmm_bytes < sizeof (*lmm)) {
120                         CERROR("lov_mds_md too small: %d, need %d\n",
121                                lmm_bytes, (int)sizeof(*lmm));
122                         RETURN(-EINVAL);
123                 }
124                 /* XXX LOV_MAGIC etc check? */
125
126                 if (lmm->lmm_object_id == 0) {
127                         CERROR("lov_mds_md: zero lmm_object_id\n");
128                         RETURN(-EINVAL);
129                 }
130         }
131
132         lsm_size = lov_stripe_md_size(1);
133         if (lsmp == NULL)
134                 RETURN(lsm_size);
135
136         if (*lsmp != NULL && lmm == NULL) {
137                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138                 OBD_FREE(*lsmp, lsm_size);
139                 *lsmp = NULL;
140                 RETURN(0);
141         }
142
143         if (*lsmp == NULL) {
144                 OBD_ALLOC(*lsmp, lsm_size);
145                 if (*lsmp == NULL)
146                         RETURN(-ENOMEM);
147                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
148                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
149                         OBD_FREE(*lsmp, lsm_size);
150                         RETURN(-ENOMEM);
151                 }
152                 loi_init((*lsmp)->lsm_oinfo[0]);
153         }
154
155         if (lmm != NULL) {
156                 /* XXX zero *lsmp? */
157                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
158                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
159                 LASSERT((*lsmp)->lsm_object_id);
160                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
161         }
162
163         if (imp != NULL &&
164             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
165                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
166         else
167                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
168
169         RETURN(lsm_size);
170 }
171
172 static inline void osc_pack_capa(struct ptlrpc_request *req,
173                                  struct ost_body *body, void *capa)
174 {
175         struct obd_capa *oc = (struct obd_capa *)capa;
176         struct lustre_capa *c;
177
178         if (!capa)
179                 return;
180
181         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
182         LASSERT(c);
183         capa_cpy(c, oc);
184         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
185         DEBUG_CAPA(D_SEC, c, "pack");
186 }
187
188 static inline void osc_pack_req_body(struct ptlrpc_request *req,
189                                      struct obd_info *oinfo)
190 {
191         struct ost_body *body;
192
193         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
194         LASSERT(body);
195
196         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
197         osc_pack_capa(req, body, oinfo->oi_capa);
198 }
199
200 static inline void osc_set_capa_size(struct ptlrpc_request *req,
201                                      const struct req_msg_field *field,
202                                      struct obd_capa *oc)
203 {
204         if (oc == NULL)
205                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
206         else
207                 /* it is already calculated as sizeof struct obd_capa */
208                 ;
209 }
210
211 static int osc_getattr_interpret(const struct lu_env *env,
212                                  struct ptlrpc_request *req,
213                                  struct osc_async_args *aa, int rc)
214 {
215         struct ost_body *body;
216         ENTRY;
217
218         if (rc != 0)
219                 GOTO(out, rc);
220
221         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
222         if (body) {
223                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
224                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
225
226                 /* This should really be sent by the OST */
227                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
228                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
229         } else {
230                 CDEBUG(D_INFO, "can't unpack ost_body\n");
231                 rc = -EPROTO;
232                 aa->aa_oi->oi_oa->o_valid = 0;
233         }
234 out:
235         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
236         RETURN(rc);
237 }
238
239 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
240                              struct ptlrpc_request_set *set)
241 {
242         struct ptlrpc_request *req;
243         struct osc_async_args *aa;
244         int                    rc;
245         ENTRY;
246
247         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
248         if (req == NULL)
249                 RETURN(-ENOMEM);
250
251         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
252         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
253         if (rc) {
254                 ptlrpc_request_free(req);
255                 RETURN(rc);
256         }
257
258         osc_pack_req_body(req, oinfo);
259
260         ptlrpc_request_set_replen(req);
261         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
262
263         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
264         aa = ptlrpc_req_async_args(req);
265         aa->aa_oi = oinfo;
266
267         ptlrpc_set_add_req(set, req);
268         RETURN(0);
269 }
270
271 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
272 {
273         struct ptlrpc_request *req;
274         struct ost_body       *body;
275         int                    rc;
276         ENTRY;
277
278         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
279         if (req == NULL)
280                 RETURN(-ENOMEM);
281
282         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
283         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
284         if (rc) {
285                 ptlrpc_request_free(req);
286                 RETURN(rc);
287         }
288
289         osc_pack_req_body(req, oinfo);
290
291         ptlrpc_request_set_replen(req);
292
293         rc = ptlrpc_queue_wait(req);
294         if (rc)
295                 GOTO(out, rc);
296
297         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
298         if (body == NULL)
299                 GOTO(out, rc = -EPROTO);
300
301         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
302         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
303
304         /* This should really be sent by the OST */
305         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
306         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
307
308         EXIT;
309  out:
310         ptlrpc_req_finished(req);
311         return rc;
312 }
313
314 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
315                        struct obd_trans_info *oti)
316 {
317         struct ptlrpc_request *req;
318         struct ost_body       *body;
319         int                    rc;
320         ENTRY;
321
322         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
323
324         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
325         if (req == NULL)
326                 RETURN(-ENOMEM);
327
328         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
329         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
330         if (rc) {
331                 ptlrpc_request_free(req);
332                 RETURN(rc);
333         }
334
335         osc_pack_req_body(req, oinfo);
336
337         ptlrpc_request_set_replen(req);
338
339         rc = ptlrpc_queue_wait(req);
340         if (rc)
341                 GOTO(out, rc);
342
343         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344         if (body == NULL)
345                 GOTO(out, rc = -EPROTO);
346
347         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
348
349         EXIT;
350 out:
351         ptlrpc_req_finished(req);
352         RETURN(rc);
353 }
354
355 static int osc_setattr_interpret(const struct lu_env *env,
356                                  struct ptlrpc_request *req,
357                                  struct osc_setattr_args *sa, int rc)
358 {
359         struct ost_body *body;
360         ENTRY;
361
362         if (rc != 0)
363                 GOTO(out, rc);
364
365         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
366         if (body == NULL)
367                 GOTO(out, rc = -EPROTO);
368
369         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
370 out:
371         rc = sa->sa_upcall(sa->sa_cookie, rc);
372         RETURN(rc);
373 }
374
375 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
376                            struct obd_trans_info *oti,
377                            obd_enqueue_update_f upcall, void *cookie,
378                            struct ptlrpc_request_set *rqset)
379 {
380         struct ptlrpc_request   *req;
381         struct osc_setattr_args *sa;
382         int                      rc;
383         ENTRY;
384
385         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
386         if (req == NULL)
387                 RETURN(-ENOMEM);
388
389         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
390         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
391         if (rc) {
392                 ptlrpc_request_free(req);
393                 RETURN(rc);
394         }
395
396         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
397                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
398
399         osc_pack_req_body(req, oinfo);
400
401         ptlrpc_request_set_replen(req);
402
403         /* do mds to ost setattr asynchronously */
404         if (!rqset) {
405                 /* Do not wait for response. */
406                 ptlrpcd_add_req(req, PSCOPE_OTHER);
407         } else {
408                 req->rq_interpret_reply =
409                         (ptlrpc_interpterer_t)osc_setattr_interpret;
410
411                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
412                 sa = ptlrpc_req_async_args(req);
413                 sa->sa_oa = oinfo->oi_oa;
414                 sa->sa_upcall = upcall;
415                 sa->sa_cookie = cookie;
416
417                 if (rqset == PTLRPCD_SET)
418                         ptlrpcd_add_req(req, PSCOPE_OTHER);
419                 else
420                         ptlrpc_set_add_req(rqset, req);
421         }
422
423         RETURN(0);
424 }
425
426 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
427                              struct obd_trans_info *oti,
428                              struct ptlrpc_request_set *rqset)
429 {
430         return osc_setattr_async_base(exp, oinfo, oti,
431                                       oinfo->oi_cb_up, oinfo, rqset);
432 }
433
434 int osc_real_create(struct obd_export *exp, struct obdo *oa,
435                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
436 {
437         struct ptlrpc_request *req;
438         struct ost_body       *body;
439         struct lov_stripe_md  *lsm;
440         int                    rc;
441         ENTRY;
442
443         LASSERT(oa);
444         LASSERT(ea);
445
446         lsm = *ea;
447         if (!lsm) {
448                 rc = obd_alloc_memmd(exp, &lsm);
449                 if (rc < 0)
450                         RETURN(rc);
451         }
452
453         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
454         if (req == NULL)
455                 GOTO(out, rc = -ENOMEM);
456
457         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
458         if (rc) {
459                 ptlrpc_request_free(req);
460                 GOTO(out, rc);
461         }
462
463         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
464         LASSERT(body);
465         lustre_set_wire_obdo(&body->oa, oa);
466
467         ptlrpc_request_set_replen(req);
468
469         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
470             oa->o_flags == OBD_FL_DELORPHAN) {
471                 DEBUG_REQ(D_HA, req,
472                           "delorphan from OST integration");
473                 /* Don't resend the delorphan req */
474                 req->rq_no_resend = req->rq_no_delay = 1;
475         }
476
477         rc = ptlrpc_queue_wait(req);
478         if (rc)
479                 GOTO(out_req, rc);
480
481         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
482         if (body == NULL)
483                 GOTO(out_req, rc = -EPROTO);
484
485         lustre_get_wire_obdo(oa, &body->oa);
486
487         /* This should really be sent by the OST */
488         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
489         oa->o_valid |= OBD_MD_FLBLKSZ;
490
491         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
492          * have valid lsm_oinfo data structs, so don't go touching that.
493          * This needs to be fixed in a big way.
494          */
495         lsm->lsm_object_id = oa->o_id;
496         lsm->lsm_object_seq = oa->o_seq;
497         *ea = lsm;
498
499         if (oti != NULL) {
500                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
501
502                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
503                         if (!oti->oti_logcookies)
504                                 oti_alloc_cookies(oti, 1);
505                         *oti->oti_logcookies = oa->o_lcookie;
506                 }
507         }
508
509         CDEBUG(D_HA, "transno: "LPD64"\n",
510                lustre_msg_get_transno(req->rq_repmsg));
511 out_req:
512         ptlrpc_req_finished(req);
513 out:
514         if (rc && !*ea)
515                 obd_free_memmd(exp, &lsm);
516         RETURN(rc);
517 }
518
519 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
520                    obd_enqueue_update_f upcall, void *cookie,
521                    struct ptlrpc_request_set *rqset)
522 {
523         struct ptlrpc_request   *req;
524         struct osc_setattr_args *sa;
525         struct ost_body         *body;
526         int                      rc;
527         ENTRY;
528
529         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
530         if (req == NULL)
531                 RETURN(-ENOMEM);
532
533         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
534         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
535         if (rc) {
536                 ptlrpc_request_free(req);
537                 RETURN(rc);
538         }
539         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
540         ptlrpc_at_set_req_timeout(req);
541
542         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
543         LASSERT(body);
544         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
545         osc_pack_capa(req, body, oinfo->oi_capa);
546
547         ptlrpc_request_set_replen(req);
548
549
550         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
551         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
552         sa = ptlrpc_req_async_args(req);
553         sa->sa_oa     = oinfo->oi_oa;
554         sa->sa_upcall = upcall;
555         sa->sa_cookie = cookie;
556         if (rqset == PTLRPCD_SET)
557                 ptlrpcd_add_req(req, PSCOPE_OTHER);
558         else
559                 ptlrpc_set_add_req(rqset, req);
560
561         RETURN(0);
562 }
563
564 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
565                      struct obd_trans_info *oti,
566                      struct ptlrpc_request_set *rqset)
567 {
568         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
569         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
570         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
571         return osc_punch_base(exp, oinfo,
572                               oinfo->oi_cb_up, oinfo, rqset);
573 }
574
575 static int osc_sync_interpret(const struct lu_env *env,
576                               struct ptlrpc_request *req,
577                               void *arg, int rc)
578 {
579         struct osc_async_args *aa = arg;
580         struct ost_body *body;
581         ENTRY;
582
583         if (rc)
584                 GOTO(out, rc);
585
586         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
587         if (body == NULL) {
588                 CERROR ("can't unpack ost_body\n");
589                 GOTO(out, rc = -EPROTO);
590         }
591
592         *aa->aa_oi->oi_oa = body->oa;
593 out:
594         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
595         RETURN(rc);
596 }
597
598 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
599                     obd_size start, obd_size end,
600                     struct ptlrpc_request_set *set)
601 {
602         struct ptlrpc_request *req;
603         struct ost_body       *body;
604         struct osc_async_args *aa;
605         int                    rc;
606         ENTRY;
607
608         if (!oinfo->oi_oa) {
609                 CDEBUG(D_INFO, "oa NULL\n");
610                 RETURN(-EINVAL);
611         }
612
613         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
614         if (req == NULL)
615                 RETURN(-ENOMEM);
616
617         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
618         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
619         if (rc) {
620                 ptlrpc_request_free(req);
621                 RETURN(rc);
622         }
623
624         /* overload the size and blocks fields in the oa with start/end */
625         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
626         LASSERT(body);
627         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
628         body->oa.o_size = start;
629         body->oa.o_blocks = end;
630         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
631         osc_pack_capa(req, body, oinfo->oi_capa);
632
633         ptlrpc_request_set_replen(req);
634         req->rq_interpret_reply = osc_sync_interpret;
635
636         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
637         aa = ptlrpc_req_async_args(req);
638         aa->aa_oi = oinfo;
639
640         ptlrpc_set_add_req(set, req);
641         RETURN (0);
642 }
643
644 /* Find and cancel locally locks matched by @mode in the resource found by
645  * @objid. Found locks are added into @cancel list. Returns the amount of
646  * locks added to @cancels list. */
647 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
648                                    cfs_list_t *cancels,
649                                    ldlm_mode_t mode, int lock_flags)
650 {
651         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
652         struct ldlm_res_id res_id;
653         struct ldlm_resource *res;
654         int count;
655         ENTRY;
656
657         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
658         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
659         if (res == NULL)
660                 RETURN(0);
661
662         LDLM_RESOURCE_ADDREF(res);
663         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
664                                            lock_flags, 0, NULL);
665         LDLM_RESOURCE_DELREF(res);
666         ldlm_resource_putref(res);
667         RETURN(count);
668 }
669
670 static int osc_destroy_interpret(const struct lu_env *env,
671                                  struct ptlrpc_request *req, void *data,
672                                  int rc)
673 {
674         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
675
676         cfs_atomic_dec(&cli->cl_destroy_in_flight);
677         cfs_waitq_signal(&cli->cl_destroy_waitq);
678         return 0;
679 }
680
681 static int osc_can_send_destroy(struct client_obd *cli)
682 {
683         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
684             cli->cl_max_rpcs_in_flight) {
685                 /* The destroy request can be sent */
686                 return 1;
687         }
688         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
689             cli->cl_max_rpcs_in_flight) {
690                 /*
691                  * The counter has been modified between the two atomic
692                  * operations.
693                  */
694                 cfs_waitq_signal(&cli->cl_destroy_waitq);
695         }
696         return 0;
697 }
698
699 /* Destroy requests can be async always on the client, and we don't even really
700  * care about the return code since the client cannot do anything at all about
701  * a destroy failure.
702  * When the MDS is unlinking a filename, it saves the file objects into a
703  * recovery llog, and these object records are cancelled when the OST reports
704  * they were destroyed and sync'd to disk (i.e. transaction committed).
705  * If the client dies, or the OST is down when the object should be destroyed,
706  * the records are not cancelled, and when the OST reconnects to the MDS next,
707  * it will retrieve the llog unlink logs and then sends the log cancellation
708  * cookies to the MDS after committing destroy transactions. */
709 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
710                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
711                        struct obd_export *md_export, void *capa)
712 {
713         struct client_obd     *cli = &exp->exp_obd->u.cli;
714         struct ptlrpc_request *req;
715         struct ost_body       *body;
716         CFS_LIST_HEAD(cancels);
717         int rc, count;
718         ENTRY;
719
720         if (!oa) {
721                 CDEBUG(D_INFO, "oa NULL\n");
722                 RETURN(-EINVAL);
723         }
724
725         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
726                                         LDLM_FL_DISCARD_DATA);
727
728         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
729         if (req == NULL) {
730                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
731                 RETURN(-ENOMEM);
732         }
733
734         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
735         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
736                                0, &cancels, count);
737         if (rc) {
738                 ptlrpc_request_free(req);
739                 RETURN(rc);
740         }
741
742         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
743         ptlrpc_at_set_req_timeout(req);
744
745         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
746                 oa->o_lcookie = *oti->oti_logcookies;
747         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
748         LASSERT(body);
749         lustre_set_wire_obdo(&body->oa, oa);
750
751         osc_pack_capa(req, body, (struct obd_capa *)capa);
752         ptlrpc_request_set_replen(req);
753
754         /* don't throttle destroy RPCs for the MDT */
755         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
756                 req->rq_interpret_reply = osc_destroy_interpret;
757                 if (!osc_can_send_destroy(cli)) {
758                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
759                                                           NULL);
760
761                         /*
762                          * Wait until the number of on-going destroy RPCs drops
763                          * under max_rpc_in_flight
764                          */
765                         l_wait_event_exclusive(cli->cl_destroy_waitq,
766                                                osc_can_send_destroy(cli), &lwi);
767                 }
768         }
769
770         /* Do not wait for response */
771         ptlrpcd_add_req(req, PSCOPE_OTHER);
772         RETURN(0);
773 }
774
775 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
776                                 long writing_bytes)
777 {
778         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
779
780         LASSERT(!(oa->o_valid & bits));
781
782         oa->o_valid |= bits;
783         client_obd_list_lock(&cli->cl_loi_list_lock);
784         oa->o_dirty = cli->cl_dirty;
785         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
786                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
787                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
788                 oa->o_undirty = 0;
789         } else if (cfs_atomic_read(&obd_dirty_pages) -
790                    cfs_atomic_read(&obd_dirty_transit_pages) >
791                    obd_max_dirty_pages + 1){
792                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
793                  * not covered by a lock thus they may safely race and trip
794                  * this CERROR() unless we add in a small fudge factor (+1). */
795                 CERROR("dirty %d - %d > system dirty_max %d\n",
796                        cfs_atomic_read(&obd_dirty_pages),
797                        cfs_atomic_read(&obd_dirty_transit_pages),
798                        obd_max_dirty_pages);
799                 oa->o_undirty = 0;
800         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
801                 CERROR("dirty %lu - dirty_max %lu too big???\n",
802                        cli->cl_dirty, cli->cl_dirty_max);
803                 oa->o_undirty = 0;
804         } else {
805                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
806                                 (cli->cl_max_rpcs_in_flight + 1);
807                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
808         }
809         oa->o_grant = cli->cl_avail_grant;
810         oa->o_dropped = cli->cl_lost_grant;
811         cli->cl_lost_grant = 0;
812         client_obd_list_unlock(&cli->cl_loi_list_lock);
813         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
814                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
815
816 }
817
818 static void osc_update_next_shrink(struct client_obd *cli)
819 {
820         cli->cl_next_shrink_grant =
821                 cfs_time_shift(cli->cl_grant_shrink_interval);
822         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
823                cli->cl_next_shrink_grant);
824 }
825
826 /* caller must hold loi_list_lock */
827 static void osc_consume_write_grant(struct client_obd *cli,
828                                     struct brw_page *pga)
829 {
830         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
831         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
832         cfs_atomic_inc(&obd_dirty_pages);
833         cli->cl_dirty += CFS_PAGE_SIZE;
834         cli->cl_avail_grant -= CFS_PAGE_SIZE;
835         pga->flag |= OBD_BRW_FROM_GRANT;
836         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
837                CFS_PAGE_SIZE, pga, pga->pg);
838         LASSERT(cli->cl_avail_grant >= 0);
839         osc_update_next_shrink(cli);
840 }
841
842 /* the companion to osc_consume_write_grant, called when a brw has completed.
843  * must be called with the loi lock held. */
844 static void osc_release_write_grant(struct client_obd *cli,
845                                     struct brw_page *pga, int sent)
846 {
847         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
848         ENTRY;
849
850         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
851         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
852                 EXIT;
853                 return;
854         }
855
856         pga->flag &= ~OBD_BRW_FROM_GRANT;
857         cfs_atomic_dec(&obd_dirty_pages);
858         cli->cl_dirty -= CFS_PAGE_SIZE;
859         if (pga->flag & OBD_BRW_NOCACHE) {
860                 pga->flag &= ~OBD_BRW_NOCACHE;
861                 cfs_atomic_dec(&obd_dirty_transit_pages);
862                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
863         }
864         if (!sent) {
865                 cli->cl_lost_grant += CFS_PAGE_SIZE;
866                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
867                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
868         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
869                 /* For short writes we shouldn't count parts of pages that
870                  * span a whole block on the OST side, or our accounting goes
871                  * wrong.  Should match the code in filter_grant_check. */
872                 int offset = pga->off & ~CFS_PAGE_MASK;
873                 int count = pga->count + (offset & (blocksize - 1));
874                 int end = (offset + pga->count) & (blocksize - 1);
875                 if (end)
876                         count += blocksize - end;
877
878                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
879                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
880                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
881                        cli->cl_avail_grant, cli->cl_dirty);
882         }
883
884         EXIT;
885 }
886
887 static unsigned long rpcs_in_flight(struct client_obd *cli)
888 {
889         return cli->cl_r_in_flight + cli->cl_w_in_flight;
890 }
891
892 /* caller must hold loi_list_lock */
893 void osc_wake_cache_waiters(struct client_obd *cli)
894 {
895         cfs_list_t *l, *tmp;
896         struct osc_cache_waiter *ocw;
897
898         ENTRY;
899         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
900                 /* if we can't dirty more, we must wait until some is written */
901                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
902                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
903                     obd_max_dirty_pages)) {
904                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
905                                "osc max %ld, sys max %d\n", cli->cl_dirty,
906                                cli->cl_dirty_max, obd_max_dirty_pages);
907                         return;
908                 }
909
910                 /* if still dirty cache but no grant wait for pending RPCs that
911                  * may yet return us some grant before doing sync writes */
912                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
913                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
914                                cli->cl_w_in_flight);
915                         return;
916                 }
917
918                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
919                 cfs_list_del_init(&ocw->ocw_entry);
920                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
921                         /* no more RPCs in flight to return grant, do sync IO */
922                         ocw->ocw_rc = -EDQUOT;
923                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
924                 } else {
925                         osc_consume_write_grant(cli,
926                                                 &ocw->ocw_oap->oap_brw_page);
927                 }
928
929                 cfs_waitq_signal(&ocw->ocw_waitq);
930         }
931
932         EXIT;
933 }
934
935 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
936 {
937         client_obd_list_lock(&cli->cl_loi_list_lock);
938         cli->cl_avail_grant += grant;
939         client_obd_list_unlock(&cli->cl_loi_list_lock);
940 }
941
942 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
943 {
944         if (body->oa.o_valid & OBD_MD_FLGRANT) {
945                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
946                 __osc_update_grant(cli, body->oa.o_grant);
947         }
948 }
949
950 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
951                               void *key, obd_count vallen, void *val,
952                               struct ptlrpc_request_set *set);
953
954 static int osc_shrink_grant_interpret(const struct lu_env *env,
955                                       struct ptlrpc_request *req,
956                                       void *aa, int rc)
957 {
958         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
959         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
960         struct ost_body *body;
961
962         if (rc != 0) {
963                 __osc_update_grant(cli, oa->o_grant);
964                 GOTO(out, rc);
965         }
966
967         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
968         LASSERT(body);
969         osc_update_grant(cli, body);
970 out:
971         OBDO_FREE(oa);
972         return rc;
973 }
974
975 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
976 {
977         client_obd_list_lock(&cli->cl_loi_list_lock);
978         oa->o_grant = cli->cl_avail_grant / 4;
979         cli->cl_avail_grant -= oa->o_grant;
980         client_obd_list_unlock(&cli->cl_loi_list_lock);
981         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
982                 oa->o_valid |= OBD_MD_FLFLAGS;
983                 oa->o_flags = 0;
984         }
985         oa->o_flags |= OBD_FL_SHRINK_GRANT;
986         osc_update_next_shrink(cli);
987 }
988
989 /* Shrink the current grant, either from some large amount to enough for a
990  * full set of in-flight RPCs, or if we have already shrunk to that limit
991  * then to enough for a single RPC.  This avoids keeping more grant than
992  * needed, and avoids shrinking the grant piecemeal. */
993 static int osc_shrink_grant(struct client_obd *cli)
994 {
995         long target = (cli->cl_max_rpcs_in_flight + 1) *
996                       cli->cl_max_pages_per_rpc;
997
998         client_obd_list_lock(&cli->cl_loi_list_lock);
999         if (cli->cl_avail_grant <= target)
1000                 target = cli->cl_max_pages_per_rpc;
1001         client_obd_list_unlock(&cli->cl_loi_list_lock);
1002
1003         return osc_shrink_grant_to_target(cli, target);
1004 }
1005
1006 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1007 {
1008         int    rc = 0;
1009         struct ost_body     *body;
1010         ENTRY;
1011
1012         client_obd_list_lock(&cli->cl_loi_list_lock);
1013         /* Don't shrink if we are already above or below the desired limit
1014          * We don't want to shrink below a single RPC, as that will negatively
1015          * impact block allocation and long-term performance. */
1016         if (target < cli->cl_max_pages_per_rpc)
1017                 target = cli->cl_max_pages_per_rpc;
1018
1019         if (target >= cli->cl_avail_grant) {
1020                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1021                 RETURN(0);
1022         }
1023         client_obd_list_unlock(&cli->cl_loi_list_lock);
1024
1025         OBD_ALLOC_PTR(body);
1026         if (!body)
1027                 RETURN(-ENOMEM);
1028
1029         osc_announce_cached(cli, &body->oa, 0);
1030
1031         client_obd_list_lock(&cli->cl_loi_list_lock);
1032         body->oa.o_grant = cli->cl_avail_grant - target;
1033         cli->cl_avail_grant = target;
1034         client_obd_list_unlock(&cli->cl_loi_list_lock);
1035         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1036                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1037                 body->oa.o_flags = 0;
1038         }
1039         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1040         osc_update_next_shrink(cli);
1041
1042         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1043                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1044                                 sizeof(*body), body, NULL);
1045         if (rc != 0)
1046                 __osc_update_grant(cli, body->oa.o_grant);
1047         OBD_FREE_PTR(body);
1048         RETURN(rc);
1049 }
1050
1051 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1052 static int osc_should_shrink_grant(struct client_obd *client)
1053 {
1054         cfs_time_t time = cfs_time_current();
1055         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1056
1057         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1058              OBD_CONNECT_GRANT_SHRINK) == 0)
1059                 return 0;
1060
1061         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1062                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1063                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1064                         return 1;
1065                 else
1066                         osc_update_next_shrink(client);
1067         }
1068         return 0;
1069 }
1070
1071 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1072 {
1073         struct client_obd *client;
1074
1075         cfs_list_for_each_entry(client, &item->ti_obd_list,
1076                                 cl_grant_shrink_list) {
1077                 if (osc_should_shrink_grant(client))
1078                         osc_shrink_grant(client);
1079         }
1080         return 0;
1081 }
1082
1083 static int osc_add_shrink_grant(struct client_obd *client)
1084 {
1085         int rc;
1086
1087         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1088                                        TIMEOUT_GRANT,
1089                                        osc_grant_shrink_grant_cb, NULL,
1090                                        &client->cl_grant_shrink_list);
1091         if (rc) {
1092                 CERROR("add grant client %s error %d\n",
1093                         client->cl_import->imp_obd->obd_name, rc);
1094                 return rc;
1095         }
1096         CDEBUG(D_CACHE, "add grant client %s \n",
1097                client->cl_import->imp_obd->obd_name);
1098         osc_update_next_shrink(client);
1099         return 0;
1100 }
1101
1102 static int osc_del_shrink_grant(struct client_obd *client)
1103 {
1104         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1105                                          TIMEOUT_GRANT);
1106 }
1107
1108 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1109 {
1110         /*
1111          * ocd_grant is the total grant amount we're expect to hold: if we've
1112          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1113          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1114          *
1115          * race is tolerable here: if we're evicted, but imp_state already
1116          * left EVICTED state, then cl_dirty must be 0 already.
1117          */
1118         client_obd_list_lock(&cli->cl_loi_list_lock);
1119         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1120                 cli->cl_avail_grant = ocd->ocd_grant;
1121         else
1122                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1123
1124         if (cli->cl_avail_grant < 0) {
1125                 CWARN("%s: available grant < 0, the OSS is probably not running"
1126                       " with patch from bug20278 (%ld) \n",
1127                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1128                 /* workaround for 1.6 servers which do not have
1129                  * the patch from bug20278 */
1130                 cli->cl_avail_grant = ocd->ocd_grant;
1131         }
1132
1133         client_obd_list_unlock(&cli->cl_loi_list_lock);
1134
1135         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1136                cli->cl_import->imp_obd->obd_name,
1137                cli->cl_avail_grant, cli->cl_lost_grant);
1138
1139         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1140             cfs_list_empty(&cli->cl_grant_shrink_list))
1141                 osc_add_shrink_grant(cli);
1142 }
1143
1144 /* We assume that the reason this OSC got a short read is because it read
1145  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1146  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1147  * this stripe never got written at or beyond this stripe offset yet. */
1148 static void handle_short_read(int nob_read, obd_count page_count,
1149                               struct brw_page **pga)
1150 {
1151         char *ptr;
1152         int i = 0;
1153
1154         /* skip bytes read OK */
1155         while (nob_read > 0) {
1156                 LASSERT (page_count > 0);
1157
1158                 if (pga[i]->count > nob_read) {
1159                         /* EOF inside this page */
1160                         ptr = cfs_kmap(pga[i]->pg) +
1161                                 (pga[i]->off & ~CFS_PAGE_MASK);
1162                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1163                         cfs_kunmap(pga[i]->pg);
1164                         page_count--;
1165                         i++;
1166                         break;
1167                 }
1168
1169                 nob_read -= pga[i]->count;
1170                 page_count--;
1171                 i++;
1172         }
1173
1174         /* zero remaining pages */
1175         while (page_count-- > 0) {
1176                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1177                 memset(ptr, 0, pga[i]->count);
1178                 cfs_kunmap(pga[i]->pg);
1179                 i++;
1180         }
1181 }
1182
1183 static int check_write_rcs(struct ptlrpc_request *req,
1184                            int requested_nob, int niocount,
1185                            obd_count page_count, struct brw_page **pga)
1186 {
1187         int     i;
1188         __u32   *remote_rcs;
1189
1190         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1191                                                   sizeof(*remote_rcs) *
1192                                                   niocount);
1193         if (remote_rcs == NULL) {
1194                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1195                 return(-EPROTO);
1196         }
1197
1198         /* return error if any niobuf was in error */
1199         for (i = 0; i < niocount; i++) {
1200                 if ((int)remote_rcs[i] < 0)
1201                         return(remote_rcs[i]);
1202
1203                 if (remote_rcs[i] != 0) {
1204                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1205                                 i, remote_rcs[i], req);
1206                         return(-EPROTO);
1207                 }
1208         }
1209
1210         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1211                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1212                        req->rq_bulk->bd_nob_transferred, requested_nob);
1213                 return(-EPROTO);
1214         }
1215
1216         return (0);
1217 }
1218
1219 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1220 {
1221         if (p1->flag != p2->flag) {
1222                 unsigned mask = ~(OBD_BRW_FROM_GRANT|
1223                                   OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
1224
1225                 /* warn if we try to combine flags that we don't know to be
1226                  * safe to combine */
1227                 if ((p1->flag & mask) != (p2->flag & mask))
1228                         CERROR("is it ok to have flags 0x%x and 0x%x in the "
1229                                "same brw?\n", p1->flag, p2->flag);
1230                 return 0;
1231         }
1232
1233         return (p1->off + p1->count == p2->off);
1234 }
1235
1236 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1237                                    struct brw_page **pga, int opc,
1238                                    cksum_type_t cksum_type)
1239 {
1240         __u32 cksum;
1241         int i = 0;
1242
1243         LASSERT (pg_count > 0);
1244         cksum = init_checksum(cksum_type);
1245         while (nob > 0 && pg_count > 0) {
1246                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1247                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1248                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1249
1250                 /* corrupt the data before we compute the checksum, to
1251                  * simulate an OST->client data error */
1252                 if (i == 0 && opc == OST_READ &&
1253                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1254                         memcpy(ptr + off, "bad1", min(4, nob));
1255                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1256                 cfs_kunmap(pga[i]->pg);
1257                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1258                                off, cksum);
1259
1260                 nob -= pga[i]->count;
1261                 pg_count--;
1262                 i++;
1263         }
1264         /* For sending we only compute the wrong checksum instead
1265          * of corrupting the data so it is still correct on a redo */
1266         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1267                 cksum++;
1268
1269         return cksum;
1270 }
1271
1272 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1273                                 struct lov_stripe_md *lsm, obd_count page_count,
1274                                 struct brw_page **pga,
1275                                 struct ptlrpc_request **reqp,
1276                                 struct obd_capa *ocapa, int reserve,
1277                                 int resend)
1278 {
1279         struct ptlrpc_request   *req;
1280         struct ptlrpc_bulk_desc *desc;
1281         struct ost_body         *body;
1282         struct obd_ioobj        *ioobj;
1283         struct niobuf_remote    *niobuf;
1284         int niocount, i, requested_nob, opc, rc;
1285         struct osc_brw_async_args *aa;
1286         struct req_capsule      *pill;
1287         struct brw_page *pg_prev;
1288
1289         ENTRY;
1290         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1291                 RETURN(-ENOMEM); /* Recoverable */
1292         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1293                 RETURN(-EINVAL); /* Fatal */
1294
1295         if ((cmd & OBD_BRW_WRITE) != 0) {
1296                 opc = OST_WRITE;
1297                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1298                                                 cli->cl_import->imp_rq_pool,
1299                                                 &RQF_OST_BRW_WRITE);
1300         } else {
1301                 opc = OST_READ;
1302                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1303         }
1304         if (req == NULL)
1305                 RETURN(-ENOMEM);
1306
1307         for (niocount = i = 1; i < page_count; i++) {
1308                 if (!can_merge_pages(pga[i - 1], pga[i]))
1309                         niocount++;
1310         }
1311
1312         pill = &req->rq_pill;
1313         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1314                              sizeof(*ioobj));
1315         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1316                              niocount * sizeof(*niobuf));
1317         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1318
1319         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1320         if (rc) {
1321                 ptlrpc_request_free(req);
1322                 RETURN(rc);
1323         }
1324         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1325         ptlrpc_at_set_req_timeout(req);
1326
1327         if (opc == OST_WRITE)
1328                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1329                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1330         else
1331                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1332                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1333
1334         if (desc == NULL)
1335                 GOTO(out, rc = -ENOMEM);
1336         /* NB request now owns desc and will free it when it gets freed */
1337
1338         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1339         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1340         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1341         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1342
1343         lustre_set_wire_obdo(&body->oa, oa);
1344
1345         obdo_to_ioobj(oa, ioobj);
1346         ioobj->ioo_bufcnt = niocount;
1347         osc_pack_capa(req, body, ocapa);
1348         LASSERT (page_count > 0);
1349         pg_prev = pga[0];
1350         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1351                 struct brw_page *pg = pga[i];
1352                 int poff = pg->off & ~CFS_PAGE_MASK;
1353
1354                 LASSERT(pg->count > 0);
1355                 /* make sure there is no gap in the middle of page array */
1356                 LASSERTF(page_count == 1 ||
1357                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1358                           ergo(i > 0 && i < page_count - 1,
1359                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1360                           ergo(i == page_count - 1, poff == 0)),
1361                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1362                          i, page_count, pg, pg->off, pg->count);
1363 #ifdef __linux__
1364                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1365                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1366                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1367                          i, page_count,
1368                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1369                          pg_prev->pg, page_private(pg_prev->pg),
1370                          pg_prev->pg->index, pg_prev->off);
1371 #else
1372                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1373                          "i %d p_c %u\n", i, page_count);
1374 #endif
1375                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1376                         (pg->flag & OBD_BRW_SRVLOCK));
1377
1378                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1379                 requested_nob += pg->count;
1380
1381                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1382                         niobuf--;
1383                         niobuf->len += pg->count;
1384                 } else {
1385                         niobuf->offset = pg->off;
1386                         niobuf->len    = pg->count;
1387                         niobuf->flags  = pg->flag;
1388                 }
1389                 pg_prev = pg;
1390         }
1391
1392         LASSERTF((void *)(niobuf - niocount) ==
1393                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1394                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1395                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1396
1397         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1398         if (resend) {
1399                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1400                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1401                         body->oa.o_flags = 0;
1402                 }
1403                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1404         }
1405
1406         if (osc_should_shrink_grant(cli))
1407                 osc_shrink_grant_local(cli, &body->oa);
1408
1409         /* size[REQ_REC_OFF] still sizeof (*body) */
1410         if (opc == OST_WRITE) {
1411                 if (unlikely(cli->cl_checksum) &&
1412                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1413                         /* store cl_cksum_type in a local variable since
1414                          * it can be changed via lprocfs */
1415                         cksum_type_t cksum_type = cli->cl_cksum_type;
1416
1417                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1418                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1419                                 body->oa.o_flags = 0;
1420                         }
1421                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1422                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1423                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1424                                                              page_count, pga,
1425                                                              OST_WRITE,
1426                                                              cksum_type);
1427                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1428                                body->oa.o_cksum);
1429                         /* save this in 'oa', too, for later checking */
1430                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1431                         oa->o_flags |= cksum_type_pack(cksum_type);
1432                 } else {
1433                         /* clear out the checksum flag, in case this is a
1434                          * resend but cl_checksum is no longer set. b=11238 */
1435                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1436                 }
1437                 oa->o_cksum = body->oa.o_cksum;
1438                 /* 1 RC per niobuf */
1439                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1440                                      sizeof(__u32) * niocount);
1441         } else {
1442                 if (unlikely(cli->cl_checksum) &&
1443                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1444                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1445                                 body->oa.o_flags = 0;
1446                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1447                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1448                 }
1449         }
1450         ptlrpc_request_set_replen(req);
1451
1452         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1453         aa = ptlrpc_req_async_args(req);
1454         aa->aa_oa = oa;
1455         aa->aa_requested_nob = requested_nob;
1456         aa->aa_nio_count = niocount;
1457         aa->aa_page_count = page_count;
1458         aa->aa_resends = 0;
1459         aa->aa_ppga = pga;
1460         aa->aa_cli = cli;
1461         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1462         if (ocapa && reserve)
1463                 aa->aa_ocapa = capa_get(ocapa);
1464
1465         *reqp = req;
1466         RETURN(0);
1467
1468  out:
1469         ptlrpc_req_finished(req);
1470         RETURN(rc);
1471 }
1472
1473 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1474                                 __u32 client_cksum, __u32 server_cksum, int nob,
1475                                 obd_count page_count, struct brw_page **pga,
1476                                 cksum_type_t client_cksum_type)
1477 {
1478         __u32 new_cksum;
1479         char *msg;
1480         cksum_type_t cksum_type;
1481
1482         if (server_cksum == client_cksum) {
1483                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1484                 return 0;
1485         }
1486
1487         /* If this is mmaped file - it can be changed at any time */
1488         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
1489                 return 1;
1490
1491         if (oa->o_valid & OBD_MD_FLFLAGS)
1492                 cksum_type = cksum_type_unpack(oa->o_flags);
1493         else
1494                 cksum_type = OBD_CKSUM_CRC32;
1495
1496         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1497                                       cksum_type);
1498
1499         if (cksum_type != client_cksum_type)
1500                 msg = "the server did not use the checksum type specified in "
1501                       "the original request - likely a protocol problem";
1502         else if (new_cksum == server_cksum)
1503                 msg = "changed on the client after we checksummed it - "
1504                       "likely false positive due to mmap IO (bug 11742)";
1505         else if (new_cksum == client_cksum)
1506                 msg = "changed in transit before arrival at OST";
1507         else
1508                 msg = "changed in transit AND doesn't match the original - "
1509                       "likely false positive due to mmap IO (bug 11742)";
1510
1511         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1512                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1513                            msg, libcfs_nid2str(peer->nid),
1514                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1515                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1516                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1517                            oa->o_id,
1518                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1519                            pga[0]->off,
1520                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1521         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1522                "client csum now %x\n", client_cksum, client_cksum_type,
1523                server_cksum, cksum_type, new_cksum);
1524         return 1;
1525 }
1526
1527 /* Note rc enters this function as number of bytes transferred */
1528 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1529 {
1530         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1531         const lnet_process_id_t *peer =
1532                         &req->rq_import->imp_connection->c_peer;
1533         struct client_obd *cli = aa->aa_cli;
1534         struct ost_body *body;
1535         __u32 client_cksum = 0;
1536         ENTRY;
1537
1538         if (rc < 0 && rc != -EDQUOT) {
1539                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1540                 RETURN(rc);
1541         }
1542
1543         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1544         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1545         if (body == NULL) {
1546                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1547                 RETURN(-EPROTO);
1548         }
1549
1550 #ifdef HAVE_QUOTA_SUPPORT
1551         /* set/clear over quota flag for a uid/gid */
1552         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1553             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1554                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1555
1556                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1557                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1558                        body->oa.o_flags);
1559                 lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
1560                              body->oa.o_flags);
1561         }
1562 #endif
1563
1564         osc_update_grant(cli, body);
1565
1566         if (rc < 0)
1567                 RETURN(rc);
1568
1569         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1570                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1571
1572         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1573                 if (rc > 0) {
1574                         CERROR("Unexpected +ve rc %d\n", rc);
1575                         RETURN(-EPROTO);
1576                 }
1577                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1578
1579                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1580                         RETURN(-EAGAIN);
1581
1582                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1583                     check_write_checksum(&body->oa, peer, client_cksum,
1584                                          body->oa.o_cksum, aa->aa_requested_nob,
1585                                          aa->aa_page_count, aa->aa_ppga,
1586                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1587                         RETURN(-EAGAIN);
1588
1589                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1590                                      aa->aa_page_count, aa->aa_ppga);
1591                 GOTO(out, rc);
1592         }
1593
1594         /* The rest of this function executes only for OST_READs */
1595
1596         /* if unwrap_bulk failed, return -EAGAIN to retry */
1597         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1598         if (rc < 0)
1599                 GOTO(out, rc = -EAGAIN);
1600
1601         if (rc > aa->aa_requested_nob) {
1602                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1603                        aa->aa_requested_nob);
1604                 RETURN(-EPROTO);
1605         }
1606
1607         if (rc != req->rq_bulk->bd_nob_transferred) {
1608                 CERROR ("Unexpected rc %d (%d transferred)\n",
1609                         rc, req->rq_bulk->bd_nob_transferred);
1610                 return (-EPROTO);
1611         }
1612
1613         if (rc < aa->aa_requested_nob)
1614                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1615
1616         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1617                 static int cksum_counter;
1618                 __u32      server_cksum = body->oa.o_cksum;
1619                 char      *via;
1620                 char      *router;
1621                 cksum_type_t cksum_type;
1622
1623                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
1624                         cksum_type = cksum_type_unpack(body->oa.o_flags);
1625                 else
1626                         cksum_type = OBD_CKSUM_CRC32;
1627                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1628                                                  aa->aa_ppga, OST_READ,
1629                                                  cksum_type);
1630
1631                 if (peer->nid == req->rq_bulk->bd_sender) {
1632                         via = router = "";
1633                 } else {
1634                         via = " via ";
1635                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1636                 }
1637
1638                 if (server_cksum == ~0 && rc > 0) {
1639                         CERROR("Protocol error: server %s set the 'checksum' "
1640                                "bit, but didn't send a checksum.  Not fatal, "
1641                                "but please notify on http://bugs.whamcloud.com/\n",
1642                                libcfs_nid2str(peer->nid));
1643                 } else if (server_cksum != client_cksum) {
1644                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1645                                            "%s%s%s inode "DFID" object "
1646                                            LPU64"/"LPU64" extent "
1647                                            "["LPU64"-"LPU64"]\n",
1648                                            req->rq_import->imp_obd->obd_name,
1649                                            libcfs_nid2str(peer->nid),
1650                                            via, router,
1651                                            body->oa.o_valid & OBD_MD_FLFID ?
1652                                                 body->oa.o_parent_seq : (__u64)0,
1653                                            body->oa.o_valid & OBD_MD_FLFID ?
1654                                                 body->oa.o_parent_oid : 0,
1655                                            body->oa.o_valid & OBD_MD_FLFID ?
1656                                                 body->oa.o_parent_ver : 0,
1657                                            body->oa.o_id,
1658                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1659                                                 body->oa.o_seq : (__u64)0,
1660                                            aa->aa_ppga[0]->off,
1661                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1662                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1663                                                                         1);
1664                         CERROR("client %x, server %x, cksum_type %x\n",
1665                                client_cksum, server_cksum, cksum_type);
1666                         cksum_counter = 0;
1667                         aa->aa_oa->o_cksum = client_cksum;
1668                         rc = -EAGAIN;
1669                 } else {
1670                         cksum_counter++;
1671                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1672                         rc = 0;
1673                 }
1674         } else if (unlikely(client_cksum)) {
1675                 static int cksum_missed;
1676
1677                 cksum_missed++;
1678                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1679                         CERROR("Checksum %u requested from %s but not sent\n",
1680                                cksum_missed, libcfs_nid2str(peer->nid));
1681         } else {
1682                 rc = 0;
1683         }
1684 out:
1685         if (rc >= 0)
1686                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1687
1688         RETURN(rc);
1689 }
1690
1691 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1692                             struct lov_stripe_md *lsm,
1693                             obd_count page_count, struct brw_page **pga,
1694                             struct obd_capa *ocapa)
1695 {
1696         struct ptlrpc_request *req;
1697         int                    rc;
1698         cfs_waitq_t            waitq;
1699         int                    resends = 0;
1700         struct l_wait_info     lwi;
1701
1702         ENTRY;
1703
1704         cfs_waitq_init(&waitq);
1705
1706 restart_bulk:
1707         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1708                                   page_count, pga, &req, ocapa, 0, resends);
1709         if (rc != 0)
1710                 return (rc);
1711
1712         rc = ptlrpc_queue_wait(req);
1713
1714         if (rc == -ETIMEDOUT && req->rq_resend) {
1715                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1716                 ptlrpc_req_finished(req);
1717                 goto restart_bulk;
1718         }
1719
1720         rc = osc_brw_fini_request(req, rc);
1721
1722         ptlrpc_req_finished(req);
1723         if (osc_recoverable_error(rc)) {
1724                 resends++;
1725                 if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
1726                         CERROR("too many resend retries, returning error\n");
1727                         RETURN(-EIO);
1728                 }
1729
1730                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
1731                 l_wait_event(waitq, 0, &lwi);
1732
1733                 goto restart_bulk;
1734         }
1735
1736         RETURN (rc);
1737 }
1738
1739 int osc_brw_redo_request(struct ptlrpc_request *request,
1740                          struct osc_brw_async_args *aa)
1741 {
1742         struct ptlrpc_request *new_req;
1743         struct ptlrpc_request_set *set = request->rq_set;
1744         struct osc_brw_async_args *new_aa;
1745         struct osc_async_page *oap;
1746         int rc = 0;
1747         ENTRY;
1748
1749         if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
1750                 CERROR("too many resent retries, returning error\n");
1751                 RETURN(-EIO);
1752         }
1753
1754         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1755
1756         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1757                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1758                                   aa->aa_cli, aa->aa_oa,
1759                                   NULL /* lsm unused by osc currently */,
1760                                   aa->aa_page_count, aa->aa_ppga,
1761                                   &new_req, aa->aa_ocapa, 0, 1);
1762         if (rc)
1763                 RETURN(rc);
1764
1765         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1766
1767         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1768                 if (oap->oap_request != NULL) {
1769                         LASSERTF(request == oap->oap_request,
1770                                  "request %p != oap_request %p\n",
1771                                  request, oap->oap_request);
1772                         if (oap->oap_interrupted) {
1773                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1774                                 ptlrpc_req_finished(new_req);
1775                                 RETURN(-EINTR);
1776                         }
1777                 }
1778         }
1779         /* New request takes over pga and oaps from old request.
1780          * Note that copying a list_head doesn't work, need to move it... */
1781         aa->aa_resends++;
1782         new_req->rq_interpret_reply = request->rq_interpret_reply;
1783         new_req->rq_async_args = request->rq_async_args;
1784         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1785
1786         new_aa = ptlrpc_req_async_args(new_req);
1787
1788         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1789         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1790         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1791
1792         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1793                 if (oap->oap_request) {
1794                         ptlrpc_req_finished(oap->oap_request);
1795                         oap->oap_request = ptlrpc_request_addref(new_req);
1796                 }
1797         }
1798
1799         new_aa->aa_ocapa = aa->aa_ocapa;
1800         aa->aa_ocapa = NULL;
1801
1802         /* use ptlrpc_set_add_req is safe because interpret functions work
1803          * in check_set context. only one way exist with access to request
1804          * from different thread got -EINTR - this way protected with
1805          * cl_loi_list_lock */
1806         ptlrpc_set_add_req(set, new_req);
1807
1808         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1809
1810         DEBUG_REQ(D_INFO, new_req, "new request");
1811         RETURN(0);
1812 }
1813
1814 /*
1815  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1816  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1817  * fine for our small page arrays and doesn't require allocation.  its an
1818  * insertion sort that swaps elements that are strides apart, shrinking the
1819  * stride down until its '1' and the array is sorted.
1820  */
1821 static void sort_brw_pages(struct brw_page **array, int num)
1822 {
1823         int stride, i, j;
1824         struct brw_page *tmp;
1825
1826         if (num == 1)
1827                 return;
1828         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1829                 ;
1830
1831         do {
1832                 stride /= 3;
1833                 for (i = stride ; i < num ; i++) {
1834                         tmp = array[i];
1835                         j = i;
1836                         while (j >= stride && array[j - stride]->off > tmp->off) {
1837                                 array[j] = array[j - stride];
1838                                 j -= stride;
1839                         }
1840                         array[j] = tmp;
1841                 }
1842         } while (stride > 1);
1843 }
1844
1845 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1846 {
1847         int count = 1;
1848         int offset;
1849         int i = 0;
1850
1851         LASSERT (pages > 0);
1852         offset = pg[i]->off & ~CFS_PAGE_MASK;
1853
1854         for (;;) {
1855                 pages--;
1856                 if (pages == 0)         /* that's all */
1857                         return count;
1858
1859                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1860                         return count;   /* doesn't end on page boundary */
1861
1862                 i++;
1863                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1864                 if (offset != 0)        /* doesn't start on page boundary */
1865                         return count;
1866
1867                 count++;
1868         }
1869 }
1870
1871 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1872 {
1873         struct brw_page **ppga;
1874         int i;
1875
1876         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1877         if (ppga == NULL)
1878                 return NULL;
1879
1880         for (i = 0; i < count; i++)
1881                 ppga[i] = pga + i;
1882         return ppga;
1883 }
1884
1885 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1886 {
1887         LASSERT(ppga != NULL);
1888         OBD_FREE(ppga, sizeof(*ppga) * count);
1889 }
1890
1891 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1892                    obd_count page_count, struct brw_page *pga,
1893                    struct obd_trans_info *oti)
1894 {
1895         struct obdo *saved_oa = NULL;
1896         struct brw_page **ppga, **orig;
1897         struct obd_import *imp = class_exp2cliimp(exp);
1898         struct client_obd *cli;
1899         int rc, page_count_orig;
1900         ENTRY;
1901
1902         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1903         cli = &imp->imp_obd->u.cli;
1904
1905         if (cmd & OBD_BRW_CHECK) {
1906                 /* The caller just wants to know if there's a chance that this
1907                  * I/O can succeed */
1908
1909                 if (imp->imp_invalid)
1910                         RETURN(-EIO);
1911                 RETURN(0);
1912         }
1913
1914         /* test_brw with a failed create can trip this, maybe others. */
1915         LASSERT(cli->cl_max_pages_per_rpc);
1916
1917         rc = 0;
1918
1919         orig = ppga = osc_build_ppga(pga, page_count);
1920         if (ppga == NULL)
1921                 RETURN(-ENOMEM);
1922         page_count_orig = page_count;
1923
1924         sort_brw_pages(ppga, page_count);
1925         while (page_count) {
1926                 obd_count pages_per_brw;
1927
1928                 if (page_count > cli->cl_max_pages_per_rpc)
1929                         pages_per_brw = cli->cl_max_pages_per_rpc;
1930                 else
1931                         pages_per_brw = page_count;
1932
1933                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1934
1935                 if (saved_oa != NULL) {
1936                         /* restore previously saved oa */
1937                         *oinfo->oi_oa = *saved_oa;
1938                 } else if (page_count > pages_per_brw) {
1939                         /* save a copy of oa (brw will clobber it) */
1940                         OBDO_ALLOC(saved_oa);
1941                         if (saved_oa == NULL)
1942                                 GOTO(out, rc = -ENOMEM);
1943                         *saved_oa = *oinfo->oi_oa;
1944                 }
1945
1946                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1947                                       pages_per_brw, ppga, oinfo->oi_capa);
1948
1949                 if (rc != 0)
1950                         break;
1951
1952                 page_count -= pages_per_brw;
1953                 ppga += pages_per_brw;
1954         }
1955
1956 out:
1957         osc_release_ppga(orig, page_count_orig);
1958
1959         if (saved_oa != NULL)
1960                 OBDO_FREE(saved_oa);
1961
1962         RETURN(rc);
1963 }
1964
1965 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1966  * the dirty accounting.  Writeback completes or truncate happens before
1967  * writing starts.  Must be called with the loi lock held. */
1968 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1969                            int sent)
1970 {
1971         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1972 }
1973
1974
1975 /* This maintains the lists of pending pages to read/write for a given object
1976  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1977  * to quickly find objects that are ready to send an RPC. */
1978 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1979                          int cmd)
1980 {
1981         int optimal;
1982         ENTRY;
1983
1984         if (lop->lop_num_pending == 0)
1985                 RETURN(0);
1986
1987         /* if we have an invalid import we want to drain the queued pages
1988          * by forcing them through rpcs that immediately fail and complete
1989          * the pages.  recovery relies on this to empty the queued pages
1990          * before canceling the locks and evicting down the llite pages */
1991         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
1992                 RETURN(1);
1993
1994         /* stream rpcs in queue order as long as as there is an urgent page
1995          * queued.  this is our cheap solution for good batching in the case
1996          * where writepage marks some random page in the middle of the file
1997          * as urgent because of, say, memory pressure */
1998         if (!cfs_list_empty(&lop->lop_urgent)) {
1999                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2000                 RETURN(1);
2001         }
2002         /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
2003         optimal = cli->cl_max_pages_per_rpc;
2004         if (cmd & OBD_BRW_WRITE) {
2005                 /* trigger a write rpc stream as long as there are dirtiers
2006                  * waiting for space.  as they're waiting, they're not going to
2007                  * create more pages to coalesce with what's waiting.. */
2008                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2009                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2010                         RETURN(1);
2011                 }
2012                 /* +16 to avoid triggering rpcs that would want to include pages
2013                  * that are being queued but which can't be made ready until
2014                  * the queuer finishes with the page. this is a wart for
2015                  * llite::commit_write() */
2016                 optimal += 16;
2017         }
2018         if (lop->lop_num_pending >= optimal)
2019                 RETURN(1);
2020
2021         RETURN(0);
2022 }
2023
2024 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2025 {
2026         struct osc_async_page *oap;
2027         ENTRY;
2028
2029         if (cfs_list_empty(&lop->lop_urgent))
2030                 RETURN(0);
2031
2032         oap = cfs_list_entry(lop->lop_urgent.next,
2033                          struct osc_async_page, oap_urgent_item);
2034
2035         if (oap->oap_async_flags & ASYNC_HP) {
2036                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2037                 RETURN(1);
2038         }
2039
2040         RETURN(0);
2041 }
2042
2043 static void on_list(cfs_list_t *item, cfs_list_t *list,
2044                     int should_be_on)
2045 {
2046         if (cfs_list_empty(item) && should_be_on)
2047                 cfs_list_add_tail(item, list);
2048         else if (!cfs_list_empty(item) && !should_be_on)
2049                 cfs_list_del_init(item);
2050 }
2051
2052 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2053  * can find pages to build into rpcs quickly */
2054 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2055 {
2056         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2057             lop_makes_hprpc(&loi->loi_read_lop)) {
2058                 /* HP rpc */
2059                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2060                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2061         } else {
2062                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2063                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2064                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2065                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2066         }
2067
2068         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2069                 loi->loi_write_lop.lop_num_pending);
2070
2071         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2072                 loi->loi_read_lop.lop_num_pending);
2073 }
2074
2075 static void lop_update_pending(struct client_obd *cli,
2076                                struct loi_oap_pages *lop, int cmd, int delta)
2077 {
2078         lop->lop_num_pending += delta;
2079         if (cmd & OBD_BRW_WRITE)
2080                 cli->cl_pending_w_pages += delta;
2081         else
2082                 cli->cl_pending_r_pages += delta;
2083 }
2084
2085 /**
2086  * this is called when a sync waiter receives an interruption.  Its job is to
2087  * get the caller woken as soon as possible.  If its page hasn't been put in an
2088  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2089  * desiring interruption which will forcefully complete the rpc once the rpc
2090  * has timed out.
2091  */
2092 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2093 {
2094         struct loi_oap_pages *lop;
2095         struct lov_oinfo *loi;
2096         int rc = -EBUSY;
2097         ENTRY;
2098
2099         LASSERT(!oap->oap_interrupted);
2100         oap->oap_interrupted = 1;
2101
2102         /* ok, it's been put in an rpc. only one oap gets a request reference */
2103         if (oap->oap_request != NULL) {
2104                 ptlrpc_mark_interrupted(oap->oap_request);
2105                 ptlrpcd_wake(oap->oap_request);
2106                 ptlrpc_req_finished(oap->oap_request);
2107                 oap->oap_request = NULL;
2108         }
2109
2110         /*
2111          * page completion may be called only if ->cpo_prep() method was
2112          * executed by osc_io_submit(), that also adds page the to pending list
2113          */
2114         if (!cfs_list_empty(&oap->oap_pending_item)) {
2115                 cfs_list_del_init(&oap->oap_pending_item);
2116                 cfs_list_del_init(&oap->oap_urgent_item);
2117
2118                 loi = oap->oap_loi;
2119                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2120                         &loi->loi_write_lop : &loi->loi_read_lop;
2121                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2122                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2123                 rc = oap->oap_caller_ops->ap_completion(env,
2124                                           oap->oap_caller_data,
2125                                           oap->oap_cmd, NULL, -EINTR);
2126         }
2127
2128         RETURN(rc);
2129 }
2130
2131 /* this is trying to propogate async writeback errors back up to the
2132  * application.  As an async write fails we record the error code for later if
2133  * the app does an fsync.  As long as errors persist we force future rpcs to be
2134  * sync so that the app can get a sync error and break the cycle of queueing
2135  * pages for which writeback will fail. */
2136 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2137                            int rc)
2138 {
2139         if (rc) {
2140                 if (!ar->ar_rc)
2141                         ar->ar_rc = rc;
2142
2143                 ar->ar_force_sync = 1;
2144                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2145                 return;
2146
2147         }
2148
2149         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2150                 ar->ar_force_sync = 0;
2151 }
2152
2153 void osc_oap_to_pending(struct osc_async_page *oap)
2154 {
2155         struct loi_oap_pages *lop;
2156
2157         if (oap->oap_cmd & OBD_BRW_WRITE)
2158                 lop = &oap->oap_loi->loi_write_lop;
2159         else
2160                 lop = &oap->oap_loi->loi_read_lop;
2161
2162         if (oap->oap_async_flags & ASYNC_HP)
2163                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2164         else if (oap->oap_async_flags & ASYNC_URGENT)
2165                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2166         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2167         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2168 }
2169
2170 /* this must be called holding the loi list lock to give coverage to exit_cache,
2171  * async_flag maintenance, and oap_request */
2172 static void osc_ap_completion(const struct lu_env *env,
2173                               struct client_obd *cli, struct obdo *oa,
2174                               struct osc_async_page *oap, int sent, int rc)
2175 {
2176         __u64 xid = 0;
2177
2178         ENTRY;
2179         if (oap->oap_request != NULL) {
2180                 xid = ptlrpc_req_xid(oap->oap_request);
2181                 ptlrpc_req_finished(oap->oap_request);
2182                 oap->oap_request = NULL;
2183         }
2184
2185         cfs_spin_lock(&oap->oap_lock);
2186         oap->oap_async_flags = 0;
2187         cfs_spin_unlock(&oap->oap_lock);
2188         oap->oap_interrupted = 0;
2189
2190         if (oap->oap_cmd & OBD_BRW_WRITE) {
2191                 osc_process_ar(&cli->cl_ar, xid, rc);
2192                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2193         }
2194
2195         if (rc == 0 && oa != NULL) {
2196                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2197                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2198                 if (oa->o_valid & OBD_MD_FLMTIME)
2199                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2200                 if (oa->o_valid & OBD_MD_FLATIME)
2201                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2202                 if (oa->o_valid & OBD_MD_FLCTIME)
2203                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2204         }
2205
2206         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2207                                                 oap->oap_cmd, oa, rc);
2208
2209         /* ll_ap_completion (from llite) drops PG_locked. so, a new
2210          * I/O on the page could start, but OSC calls it under lock
2211          * and thus we can add oap back to pending safely */
2212         if (rc)
2213                 /* upper layer wants to leave the page on pending queue */
2214                 osc_oap_to_pending(oap);
2215         else
2216                 osc_exit_cache(cli, oap, sent);
2217         EXIT;
2218 }
2219
2220 static int brw_interpret(const struct lu_env *env,
2221                          struct ptlrpc_request *req, void *data, int rc)
2222 {
2223         struct osc_brw_async_args *aa = data;
2224         struct client_obd *cli;
2225         int async;
2226         ENTRY;
2227
2228         rc = osc_brw_fini_request(req, rc);
2229         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2230         if (osc_recoverable_error(rc)) {
2231                 /* Only retry once for mmaped files since the mmaped page
2232                  * might be modified at anytime. We have to retry at least
2233                  * once in case there WAS really a corruption of the page
2234                  * on the network, that was not caused by mmap() modifying
2235                  * the page. Bug11742 */
2236                 if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
2237                     aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
2238                     aa->aa_oa->o_flags & OBD_FL_MMAP) {
2239                         rc = 0;
2240                 } else {
2241                         rc = osc_brw_redo_request(req, aa);
2242                         if (rc == 0)
2243                                 RETURN(0);
2244                 }
2245         }
2246
2247         if (aa->aa_ocapa) {
2248                 capa_put(aa->aa_ocapa);
2249                 aa->aa_ocapa = NULL;
2250         }
2251
2252         cli = aa->aa_cli;
2253
2254         client_obd_list_lock(&cli->cl_loi_list_lock);
2255
2256         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2257          * is called so we know whether to go to sync BRWs or wait for more
2258          * RPCs to complete */
2259         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2260                 cli->cl_w_in_flight--;
2261         else
2262                 cli->cl_r_in_flight--;
2263
2264         async = cfs_list_empty(&aa->aa_oaps);
2265         if (!async) { /* from osc_send_oap_rpc() */
2266                 struct osc_async_page *oap, *tmp;
2267                 /* the caller may re-use the oap after the completion call so
2268                  * we need to clean it up a little */
2269                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2270                                              oap_rpc_item) {
2271                         cfs_list_del_init(&oap->oap_rpc_item);
2272                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2273                 }
2274                 OBDO_FREE(aa->aa_oa);
2275         } else { /* from async_internal() */
2276                 obd_count i;
2277                 for (i = 0; i < aa->aa_page_count; i++)
2278                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2279         }
2280         osc_wake_cache_waiters(cli);
2281         osc_check_rpcs(env, cli);
2282         client_obd_list_unlock(&cli->cl_loi_list_lock);
2283         if (!async)
2284                 cl_req_completion(env, aa->aa_clerq, rc);
2285         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2286
2287         RETURN(rc);
2288 }
2289
2290 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2291                                             struct client_obd *cli,
2292                                             cfs_list_t *rpc_list,
2293                                             int page_count, int cmd)
2294 {
2295         struct ptlrpc_request *req;
2296         struct brw_page **pga = NULL;
2297         struct osc_brw_async_args *aa;
2298         struct obdo *oa = NULL;
2299         const struct obd_async_page_ops *ops = NULL;
2300         void *caller_data = NULL;
2301         struct osc_async_page *oap;
2302         struct osc_async_page *tmp;
2303         struct ost_body *body;
2304         struct cl_req *clerq = NULL;
2305         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2306         struct ldlm_lock *lock = NULL;
2307         struct cl_req_attr crattr;
2308         int i, rc, mpflag = 0;
2309
2310         ENTRY;
2311         LASSERT(!cfs_list_empty(rpc_list));
2312
2313         if (cmd & OBD_BRW_MEMALLOC)
2314                 mpflag = cfs_memory_pressure_get_and_set();
2315
2316         memset(&crattr, 0, sizeof crattr);
2317         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2318         if (pga == NULL)
2319                 GOTO(out, req = ERR_PTR(-ENOMEM));
2320
2321         OBDO_ALLOC(oa);
2322         if (oa == NULL)
2323                 GOTO(out, req = ERR_PTR(-ENOMEM));
2324
2325         i = 0;
2326         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2327                 struct cl_page *page = osc_oap2cl_page(oap);
2328                 if (ops == NULL) {
2329                         ops = oap->oap_caller_ops;
2330                         caller_data = oap->oap_caller_data;
2331
2332                         clerq = cl_req_alloc(env, page, crt,
2333                                              1 /* only 1-object rpcs for
2334                                                 * now */);
2335                         if (IS_ERR(clerq))
2336                                 GOTO(out, req = (void *)clerq);
2337                         lock = oap->oap_ldlm_lock;
2338                 }
2339                 pga[i] = &oap->oap_brw_page;
2340                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2341                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2342                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2343                 i++;
2344                 cl_req_page_add(env, clerq, page);
2345         }
2346
2347         /* always get the data for the obdo for the rpc */
2348         LASSERT(ops != NULL);
2349         crattr.cra_oa = oa;
2350         crattr.cra_capa = NULL;
2351         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2352         if (lock) {
2353                 oa->o_handle = lock->l_remote_handle;
2354                 oa->o_valid |= OBD_MD_FLHANDLE;
2355         }
2356
2357         rc = cl_req_prep(env, clerq);
2358         if (rc != 0) {
2359                 CERROR("cl_req_prep failed: %d\n", rc);
2360                 GOTO(out, req = ERR_PTR(rc));
2361         }
2362
2363         sort_brw_pages(pga, page_count);
2364         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2365                                   pga, &req, crattr.cra_capa, 1, 0);
2366         if (rc != 0) {
2367                 CERROR("prep_req failed: %d\n", rc);
2368                 GOTO(out, req = ERR_PTR(rc));
2369         }
2370
2371         if (cmd & OBD_BRW_MEMALLOC)
2372                 req->rq_memalloc = 1;
2373
2374         /* Need to update the timestamps after the request is built in case
2375          * we race with setattr (locally or in queue at OST).  If OST gets
2376          * later setattr before earlier BRW (as determined by the request xid),
2377          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2378          * way to do this in a single call.  bug 10150 */
2379         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
2380         cl_req_attr_set(env, clerq, &crattr,
2381                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2382
2383         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2384         aa = ptlrpc_req_async_args(req);
2385         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2386         cfs_list_splice(rpc_list, &aa->aa_oaps);
2387         CFS_INIT_LIST_HEAD(rpc_list);
2388         aa->aa_clerq = clerq;
2389 out:
2390         if (cmd & OBD_BRW_MEMALLOC)
2391                 cfs_memory_pressure_restore(mpflag);
2392
2393         capa_put(crattr.cra_capa);
2394         if (IS_ERR(req)) {
2395                 if (oa)
2396                         OBDO_FREE(oa);
2397                 if (pga)
2398                         OBD_FREE(pga, sizeof(*pga) * page_count);
2399                 /* this should happen rarely and is pretty bad, it makes the
2400                  * pending list not follow the dirty order */
2401                 client_obd_list_lock(&cli->cl_loi_list_lock);
2402                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2403                         cfs_list_del_init(&oap->oap_rpc_item);
2404
2405                         /* queued sync pages can be torn down while the pages
2406                          * were between the pending list and the rpc */
2407                         if (oap->oap_interrupted) {
2408                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2409                                 osc_ap_completion(env, cli, NULL, oap, 0,
2410                                                   oap->oap_count);
2411                                 continue;
2412                         }
2413                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2414                 }
2415                 if (clerq && !IS_ERR(clerq))
2416                         cl_req_completion(env, clerq, PTR_ERR(req));
2417         }
2418         RETURN(req);
2419 }
2420
2421 /**
2422  * prepare pages for ASYNC io and put pages in send queue.
2423  *
2424  * \param cmd OBD_BRW_* macroses
2425  * \param lop pending pages
2426  *
2427  * \return zero if no page added to send queue.
2428  * \return 1 if pages successfully added to send queue.
2429  * \return negative on errors.
2430  */
2431 static int
2432 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2433                  struct lov_oinfo *loi,
2434                  int cmd, struct loi_oap_pages *lop)
2435 {
2436         struct ptlrpc_request *req;
2437         obd_count page_count = 0;
2438         struct osc_async_page *oap = NULL, *tmp;
2439         struct osc_brw_async_args *aa;
2440         const struct obd_async_page_ops *ops;
2441         CFS_LIST_HEAD(rpc_list);
2442         int srvlock = 0, mem_tight = 0;
2443         struct cl_object *clob = NULL;
2444         obd_off starting_offset = OBD_OBJECT_EOF;
2445         unsigned int ending_offset;
2446         int starting_page_off = 0;
2447         ENTRY;
2448
2449         /* ASYNC_HP pages first. At present, when the lock the pages is
2450          * to be canceled, the pages covered by the lock will be sent out
2451          * with ASYNC_HP. We have to send out them as soon as possible. */
2452         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2453                 if (oap->oap_async_flags & ASYNC_HP)
2454                         cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
2455                 if (++page_count >= cli->cl_max_pages_per_rpc)
2456                         break;
2457         }
2458         page_count = 0;
2459
2460         /* first we find the pages we're allowed to work with */
2461         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2462                                      oap_pending_item) {
2463                 ops = oap->oap_caller_ops;
2464
2465                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2466                          "magic 0x%x\n", oap, oap->oap_magic);
2467
2468                 if (clob == NULL) {
2469                         /* pin object in memory, so that completion call-backs
2470                          * can be safely called under client_obd_list lock. */
2471                         clob = osc_oap2cl_page(oap)->cp_obj;
2472                         cl_object_get(clob);
2473                 }
2474
2475                 if (page_count != 0 &&
2476                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2477                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2478                                " oap %p, page %p, srvlock %u\n",
2479                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2480                         break;
2481                 }
2482
2483                 /* If there is a gap at the start of this page, it can't merge
2484                  * with any previous page, so we'll hand the network a
2485                  * "fragmented" page array that it can't transfer in 1 RDMA */
2486                 if (oap->oap_obj_off < starting_offset) {
2487                         if (starting_page_off != 0)
2488                                 break;
2489
2490                         starting_page_off = oap->oap_page_off;
2491                         starting_offset = oap->oap_obj_off + starting_page_off;
2492                 } else if (oap->oap_page_off != 0)
2493                         break;
2494
2495                 /* in llite being 'ready' equates to the page being locked
2496                  * until completion unlocks it.  commit_write submits a page
2497                  * as not ready because its unlock will happen unconditionally
2498                  * as the call returns.  if we race with commit_write giving
2499                  * us that page we don't want to create a hole in the page
2500                  * stream, so we stop and leave the rpc to be fired by
2501                  * another dirtier or kupdated interval (the not ready page
2502                  * will still be on the dirty list).  we could call in
2503                  * at the end of ll_file_write to process the queue again. */
2504                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2505                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2506                                                     cmd);
2507                         if (rc < 0)
2508                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2509                                                 "instead of ready\n", oap,
2510                                                 oap->oap_page, rc);
2511                         switch (rc) {
2512                         case -EAGAIN:
2513                                 /* llite is telling us that the page is still
2514                                  * in commit_write and that we should try
2515                                  * and put it in an rpc again later.  we
2516                                  * break out of the loop so we don't create
2517                                  * a hole in the sequence of pages in the rpc
2518                                  * stream.*/
2519                                 oap = NULL;
2520                                 break;
2521                         case -EINTR:
2522                                 /* the io isn't needed.. tell the checks
2523                                  * below to complete the rpc with EINTR */
2524                                 cfs_spin_lock(&oap->oap_lock);
2525                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2526                                 cfs_spin_unlock(&oap->oap_lock);
2527                                 oap->oap_count = -EINTR;
2528                                 break;
2529                         case 0:
2530                                 cfs_spin_lock(&oap->oap_lock);
2531                                 oap->oap_async_flags |= ASYNC_READY;
2532                                 cfs_spin_unlock(&oap->oap_lock);
2533                                 break;
2534                         default:
2535                                 LASSERTF(0, "oap %p page %p returned %d "
2536                                             "from make_ready\n", oap,
2537                                             oap->oap_page, rc);
2538                                 break;
2539                         }
2540                 }
2541                 if (oap == NULL)
2542                         break;
2543                 /*
2544                  * Page submitted for IO has to be locked. Either by
2545                  * ->ap_make_ready() or by higher layers.
2546                  */
2547 #if defined(__KERNEL__) && defined(__linux__)
2548                 {
2549                         struct cl_page *page;
2550
2551                         page = osc_oap2cl_page(oap);
2552
2553                         if (page->cp_type == CPT_CACHEABLE &&
2554                             !(PageLocked(oap->oap_page) &&
2555                               (CheckWriteback(oap->oap_page, cmd)))) {
2556                                 CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
2557                                        oap->oap_page,
2558                                        (long)oap->oap_page->flags,
2559                                        oap->oap_async_flags);
2560                                 LBUG();
2561                         }
2562                 }
2563 #endif
2564
2565                 /* take the page out of our book-keeping */
2566                 cfs_list_del_init(&oap->oap_pending_item);
2567                 lop_update_pending(cli, lop, cmd, -1);
2568                 cfs_list_del_init(&oap->oap_urgent_item);
2569
2570                 /* ask the caller for the size of the io as the rpc leaves. */
2571                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2572                         oap->oap_count =
2573                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2574                                                       cmd);
2575                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2576                 }
2577                 if (oap->oap_count <= 0) {
2578                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2579                                oap->oap_count);
2580                         osc_ap_completion(env, cli, NULL,
2581                                           oap, 0, oap->oap_count);
2582                         continue;
2583                 }
2584
2585                 /* now put the page back in our accounting */
2586                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2587                 if (page_count++ == 0)
2588                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2589
2590                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2591                         mem_tight = 1;
2592
2593                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2594                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2595                  * have the same alignment as the initial writes that allocated
2596                  * extents on the server. */
2597                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2598                                 oap->oap_count;
2599                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2600                         break;
2601
2602                 if (page_count >= cli->cl_max_pages_per_rpc)
2603                         break;
2604
2605                 /* If there is a gap at the end of this page, it can't merge
2606                  * with any subsequent pages, so we'll hand the network a
2607                  * "fragmented" page array that it can't transfer in 1 RDMA */
2608                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2609                         break;
2610         }
2611
2612         osc_wake_cache_waiters(cli);
2613
2614         loi_list_maint(cli, loi);
2615
2616         client_obd_list_unlock(&cli->cl_loi_list_lock);
2617
2618         if (clob != NULL)
2619                 cl_object_put(env, clob);
2620
2621         if (page_count == 0) {
2622                 client_obd_list_lock(&cli->cl_loi_list_lock);
2623                 RETURN(0);
2624         }
2625
2626         req = osc_build_req(env, cli, &rpc_list, page_count,
2627                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2628         if (IS_ERR(req)) {
2629                 LASSERT(cfs_list_empty(&rpc_list));
2630                 loi_list_maint(cli, loi);
2631                 RETURN(PTR_ERR(req));
2632         }
2633
2634         aa = ptlrpc_req_async_args(req);
2635
2636         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2637         if (cmd == OBD_BRW_READ) {
2638                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2639                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2640                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2641                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2642         } else {
2643                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2644                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2645                                  cli->cl_w_in_flight);
2646                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2647                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2648         }
2649         ptlrpc_lprocfs_brw(req, aa->aa_requested_nob);
2650
2651         client_obd_list_lock(&cli->cl_loi_list_lock);
2652
2653         if (cmd == OBD_BRW_READ)
2654                 cli->cl_r_in_flight++;
2655         else
2656                 cli->cl_w_in_flight++;
2657
2658         /* queued sync pages can be torn down while the pages
2659          * were between the pending list and the rpc */
2660         tmp = NULL;
2661         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2662                 /* only one oap gets a request reference */
2663                 if (tmp == NULL)
2664                         tmp = oap;
2665                 if (oap->oap_interrupted && !req->rq_intr) {
2666                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2667                                oap, req);
2668                         ptlrpc_mark_interrupted(req);
2669                 }
2670         }
2671         if (tmp != NULL)
2672                 tmp->oap_request = ptlrpc_request_addref(req);
2673
2674         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2675                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2676
2677         req->rq_interpret_reply = brw_interpret;
2678         ptlrpcd_add_req(req, PSCOPE_BRW);
2679         RETURN(1);
2680 }
2681
2682 #define LOI_DEBUG(LOI, STR, args...)                                     \
2683         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2684                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2685                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2686                (LOI)->loi_write_lop.lop_num_pending,                     \
2687                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2688                (LOI)->loi_read_lop.lop_num_pending,                      \
2689                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2690                args)                                                     \
2691
2692 /* This is called by osc_check_rpcs() to find which objects have pages that
2693  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2694 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2695 {
2696         ENTRY;
2697
2698         /* First return objects that have blocked locks so that they
2699          * will be flushed quickly and other clients can get the lock,
2700          * then objects which have pages ready to be stuffed into RPCs */
2701         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2702                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2703                                       struct lov_oinfo, loi_hp_ready_item));
2704         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2705                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2706                                       struct lov_oinfo, loi_ready_item));
2707
2708         /* then if we have cache waiters, return all objects with queued
2709          * writes.  This is especially important when many small files
2710          * have filled up the cache and not been fired into rpcs because
2711          * they don't pass the nr_pending/object threshhold */
2712         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2713             !cfs_list_empty(&cli->cl_loi_write_list))
2714                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2715                                       struct lov_oinfo, loi_write_item));
2716
2717         /* then return all queued objects when we have an invalid import
2718          * so that they get flushed */
2719         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2720                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2721                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2722                                               struct lov_oinfo,
2723                                               loi_write_item));
2724                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2725                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2726                                               struct lov_oinfo, loi_read_item));
2727         }
2728         RETURN(NULL);
2729 }
2730
2731 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2732 {
2733         struct osc_async_page *oap;
2734         int hprpc = 0;
2735
2736         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2737                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2738                                      struct osc_async_page, oap_urgent_item);
2739                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2740         }
2741
2742         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2743                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2744                                      struct osc_async_page, oap_urgent_item);
2745                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2746         }
2747
2748         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2749 }
2750
2751 /* called with the loi list lock held */
2752 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2753 {
2754         struct lov_oinfo *loi;
2755         int rc = 0, race_counter = 0;
2756         ENTRY;
2757
2758         while ((loi = osc_next_loi(cli)) != NULL) {
2759                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2760
2761                 if (osc_max_rpc_in_flight(cli, loi))
2762                         break;
2763
2764                 /* attempt some read/write balancing by alternating between
2765                  * reads and writes in an object.  The makes_rpc checks here
2766                  * would be redundant if we were getting read/write work items
2767                  * instead of objects.  we don't want send_oap_rpc to drain a
2768                  * partial read pending queue when we're given this object to
2769                  * do io on writes while there are cache waiters */
2770                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2771                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2772                                               &loi->loi_write_lop);
2773                         if (rc < 0) {
2774                                 CERROR("Write request failed with %d\n", rc);
2775
2776                                 /* osc_send_oap_rpc failed, mostly because of
2777                                  * memory pressure.
2778                                  *
2779                                  * It can't break here, because if:
2780                                  *  - a page was submitted by osc_io_submit, so
2781                                  *    page locked;
2782                                  *  - no request in flight
2783                                  *  - no subsequent request
2784                                  * The system will be in live-lock state,
2785                                  * because there is no chance to call
2786                                  * osc_io_unplug() and osc_check_rpcs() any
2787                                  * more. pdflush can't help in this case,
2788                                  * because it might be blocked at grabbing
2789                                  * the page lock as we mentioned.
2790                                  *
2791                                  * Anyway, continue to drain pages. */
2792                                 /* break; */
2793                         }
2794
2795                         if (rc > 0)
2796                                 race_counter = 0;
2797                         else if (rc == 0)
2798                                 race_counter++;
2799                 }
2800                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2801                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2802                                               &loi->loi_read_lop);
2803                         if (rc < 0)
2804                                 CERROR("Read request failed with %d\n", rc);
2805
2806                         if (rc > 0)
2807                                 race_counter = 0;
2808                         else if (rc == 0)
2809                                 race_counter++;
2810                 }
2811
2812                 /* attempt some inter-object balancing by issuing rpcs
2813                  * for each object in turn */
2814                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2815                         cfs_list_del_init(&loi->loi_hp_ready_item);
2816                 if (!cfs_list_empty(&loi->loi_ready_item))
2817                         cfs_list_del_init(&loi->loi_ready_item);
2818                 if (!cfs_list_empty(&loi->loi_write_item))
2819                         cfs_list_del_init(&loi->loi_write_item);
2820                 if (!cfs_list_empty(&loi->loi_read_item))
2821                         cfs_list_del_init(&loi->loi_read_item);
2822
2823                 loi_list_maint(cli, loi);
2824
2825                 /* send_oap_rpc fails with 0 when make_ready tells it to
2826                  * back off.  llite's make_ready does this when it tries
2827                  * to lock a page queued for write that is already locked.
2828                  * we want to try sending rpcs from many objects, but we
2829                  * don't want to spin failing with 0.  */
2830                 if (race_counter == 10)
2831                         break;
2832         }
2833         EXIT;
2834 }
2835
2836 /* we're trying to queue a page in the osc so we're subject to the
2837  * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
2838  * If the osc's queued pages are already at that limit, then we want to sleep
2839  * until there is space in the osc's queue for us.  We also may be waiting for
2840  * write credits from the OST if there are RPCs in flight that may return some
2841  * before we fall back to sync writes.
2842  *
2843  * We need this know our allocation was granted in the presence of signals */
2844 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
2845 {
2846         int rc;
2847         ENTRY;
2848         client_obd_list_lock(&cli->cl_loi_list_lock);
2849         rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
2850         client_obd_list_unlock(&cli->cl_loi_list_lock);
2851         RETURN(rc);
2852 };
2853
2854 /**
2855  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2856  * is available.
2857  */
2858 int osc_enter_cache_try(const struct lu_env *env,
2859                         struct client_obd *cli, struct lov_oinfo *loi,
2860                         struct osc_async_page *oap, int transient)
2861 {
2862         int has_grant;
2863
2864         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2865         if (has_grant) {
2866                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2867                 if (transient) {
2868                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2869                         cfs_atomic_inc(&obd_dirty_transit_pages);
2870                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2871                 }
2872         }
2873         return has_grant;
2874 }
2875
2876 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2877  * grant or cache space. */
2878 static int osc_enter_cache(const struct lu_env *env,
2879                            struct client_obd *cli, struct lov_oinfo *loi,
2880                            struct osc_async_page *oap)
2881 {
2882         struct osc_cache_waiter ocw;
2883         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2884
2885         ENTRY;
2886
2887         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2888                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2889                cli->cl_dirty_max, obd_max_dirty_pages,
2890                cli->cl_lost_grant, cli->cl_avail_grant);
2891
2892         /* force the caller to try sync io.  this can jump the list
2893          * of queued writes and create a discontiguous rpc stream */
2894         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2895             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
2896             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2897                 RETURN(-EDQUOT);
2898
2899         /* Hopefully normal case - cache space and write credits available */
2900         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2901             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2902             osc_enter_cache_try(env, cli, loi, oap, 0))
2903                 RETURN(0);
2904
2905         /* It is safe to block as a cache waiter as long as there is grant
2906          * space available or the hope of additional grant being returned
2907          * when an in flight write completes.  Using the write back cache
2908          * if possible is preferable to sending the data synchronously
2909          * because write pages can then be merged in to large requests.
2910          * The addition of this cache waiter will causing pending write
2911          * pages to be sent immediately. */
2912         if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
2913                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2914                 cfs_waitq_init(&ocw.ocw_waitq);
2915                 ocw.ocw_oap = oap;
2916                 ocw.ocw_rc = 0;
2917
2918                 loi_list_maint(cli, loi);
2919                 osc_check_rpcs(env, cli);
2920                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2921
2922                 CDEBUG(D_CACHE, "sleeping for cache space\n");
2923                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
2924
2925                 client_obd_list_lock(&cli->cl_loi_list_lock);
2926                 if (!cfs_list_empty(&ocw.ocw_entry)) {
2927                         cfs_list_del(&ocw.ocw_entry);
2928                         RETURN(-EINTR);
2929                 }
2930                 RETURN(ocw.ocw_rc);
2931         }
2932
2933         RETURN(-EDQUOT);
2934 }
2935
2936
2937 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2938                         struct lov_oinfo *loi, cfs_page_t *page,
2939                         obd_off offset, const struct obd_async_page_ops *ops,
2940                         void *data, void **res, int nocache,
2941                         struct lustre_handle *lockh)
2942 {
2943         struct osc_async_page *oap;
2944
2945         ENTRY;
2946
2947         if (!page)
2948                 return cfs_size_round(sizeof(*oap));
2949
2950         oap = *res;
2951         oap->oap_magic = OAP_MAGIC;
2952         oap->oap_cli = &exp->exp_obd->u.cli;
2953         oap->oap_loi = loi;
2954
2955         oap->oap_caller_ops = ops;
2956         oap->oap_caller_data = data;
2957
2958         oap->oap_page = page;
2959         oap->oap_obj_off = offset;
2960         if (!client_is_remote(exp) &&
2961             cfs_capable(CFS_CAP_SYS_RESOURCE))
2962                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2963
2964         LASSERT(!(offset & ~CFS_PAGE_MASK));
2965
2966         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2967         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2968         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2969         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2970
2971         cfs_spin_lock_init(&oap->oap_lock);
2972         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2973         RETURN(0);
2974 }
2975
2976 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2977                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2978                        struct osc_async_page *oap, int cmd, int off,
2979                        int count, obd_flag brw_flags, enum async_flags async_flags)
2980 {
2981         struct client_obd *cli = &exp->exp_obd->u.cli;
2982         int rc = 0;
2983         ENTRY;
2984
2985         if (oap->oap_magic != OAP_MAGIC)
2986                 RETURN(-EINVAL);
2987
2988         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2989                 RETURN(-EIO);
2990
2991         if (!cfs_list_empty(&oap->oap_pending_item) ||
2992             !cfs_list_empty(&oap->oap_urgent_item) ||
2993             !cfs_list_empty(&oap->oap_rpc_item))
2994                 RETURN(-EBUSY);
2995
2996         /* check if the file's owner/group is over quota */
2997         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
2998                 struct cl_object *obj;
2999                 struct cl_attr    attr; /* XXX put attr into thread info */
3000                 unsigned int qid[MAXQUOTAS];
3001
3002                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3003
3004                 cl_object_attr_lock(obj);
3005                 rc = cl_object_attr_get(env, obj, &attr);
3006                 cl_object_attr_unlock(obj);
3007
3008                 qid[USRQUOTA] = attr.cat_uid;
3009                 qid[GRPQUOTA] = attr.cat_gid;
3010                 if (rc == 0 &&
3011                     lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
3012                         rc = -EDQUOT;
3013                 if (rc)
3014                         RETURN(rc);
3015         }
3016
3017         if (loi == NULL)
3018                 loi = lsm->lsm_oinfo[0];
3019
3020         client_obd_list_lock(&cli->cl_loi_list_lock);
3021
3022         LASSERT(off + count <= CFS_PAGE_SIZE);
3023         oap->oap_cmd = cmd;
3024         oap->oap_page_off = off;
3025         oap->oap_count = count;
3026         oap->oap_brw_flags = brw_flags;
3027         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3028         if (cfs_memory_pressure_get())
3029                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3030         cfs_spin_lock(&oap->oap_lock);
3031         oap->oap_async_flags = async_flags;
3032         cfs_spin_unlock(&oap->oap_lock);
3033
3034         if (cmd & OBD_BRW_WRITE) {
3035                 rc = osc_enter_cache(env, cli, loi, oap);
3036                 if (rc) {
3037                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3038                         RETURN(rc);
3039                 }
3040         }
3041
3042         osc_oap_to_pending(oap);
3043         loi_list_maint(cli, loi);
3044
3045         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3046                   cmd);
3047
3048         osc_check_rpcs(env, cli);
3049         client_obd_list_unlock(&cli->cl_loi_list_lock);
3050
3051         RETURN(0);
3052 }
3053
3054 /* aka (~was & now & flag), but this is more clear :) */
3055 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3056
3057 int osc_set_async_flags_base(struct client_obd *cli,
3058                              struct lov_oinfo *loi, struct osc_async_page *oap,
3059                              obd_flag async_flags)
3060 {
3061         struct loi_oap_pages *lop;
3062         int flags = 0;
3063         ENTRY;
3064
3065         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3066
3067         if (oap->oap_cmd & OBD_BRW_WRITE) {
3068                 lop = &loi->loi_write_lop;
3069         } else {
3070                 lop = &loi->loi_read_lop;
3071         }
3072
3073         if ((oap->oap_async_flags & async_flags) == async_flags)
3074                 RETURN(0);
3075
3076         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3077                 flags |= ASYNC_READY;
3078
3079         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3080             cfs_list_empty(&oap->oap_rpc_item)) {
3081                 if (oap->oap_async_flags & ASYNC_HP)
3082                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3083                 else
3084                         cfs_list_add_tail(&oap->oap_urgent_item,
3085                                           &lop->lop_urgent);
3086                 flags |= ASYNC_URGENT;
3087                 loi_list_maint(cli, loi);
3088         }
3089         cfs_spin_lock(&oap->oap_lock);
3090         oap->oap_async_flags |= flags;
3091         cfs_spin_unlock(&oap->oap_lock);
3092
3093         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3094                         oap->oap_async_flags);
3095         RETURN(0);
3096 }
3097
3098 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3099                             struct lov_oinfo *loi, struct osc_async_page *oap)
3100 {
3101         struct client_obd *cli = &exp->exp_obd->u.cli;
3102         struct loi_oap_pages *lop;
3103         int rc = 0;
3104         ENTRY;
3105
3106         if (oap->oap_magic != OAP_MAGIC)
3107                 RETURN(-EINVAL);
3108
3109         if (loi == NULL)
3110                 loi = lsm->lsm_oinfo[0];
3111
3112         if (oap->oap_cmd & OBD_BRW_WRITE) {
3113                 lop = &loi->loi_write_lop;
3114         } else {
3115                 lop = &loi->loi_read_lop;
3116         }
3117
3118         client_obd_list_lock(&cli->cl_loi_list_lock);
3119
3120         if (!cfs_list_empty(&oap->oap_rpc_item))
3121                 GOTO(out, rc = -EBUSY);
3122
3123         osc_exit_cache(cli, oap, 0);
3124         osc_wake_cache_waiters(cli);
3125
3126         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3127                 cfs_list_del_init(&oap->oap_urgent_item);
3128                 cfs_spin_lock(&oap->oap_lock);
3129                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3130                 cfs_spin_unlock(&oap->oap_lock);
3131         }
3132         if (!cfs_list_empty(&oap->oap_pending_item)) {
3133                 cfs_list_del_init(&oap->oap_pending_item);
3134                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3135         }
3136         loi_list_maint(cli, loi);
3137         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3138 out:
3139         client_obd_list_unlock(&cli->cl_loi_list_lock);
3140         RETURN(rc);
3141 }
3142
3143 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3144                                         struct ldlm_enqueue_info *einfo)
3145 {
3146         void *data = einfo->ei_cbdata;
3147         int set = 0;
3148
3149         LASSERT(lock != NULL);
3150         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3151         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3152         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3153         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3154
3155         lock_res_and_lock(lock);
3156         cfs_spin_lock(&osc_ast_guard);
3157
3158         if (lock->l_ast_data == NULL)
3159                 lock->l_ast_data = data;
3160         if (lock->l_ast_data == data)
3161                 set = 1;
3162
3163         cfs_spin_unlock(&osc_ast_guard);
3164         unlock_res_and_lock(lock);
3165
3166         return set;
3167 }
3168
3169 static int osc_set_data_with_check(struct lustre_handle *lockh,
3170                                    struct ldlm_enqueue_info *einfo)
3171 {
3172         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3173         int set = 0;
3174
3175         if (lock != NULL) {
3176                 set = osc_set_lock_data_with_check(lock, einfo);
3177                 LDLM_LOCK_PUT(lock);
3178         } else
3179                 CERROR("lockh %p, data %p - client evicted?\n",
3180                        lockh, einfo->ei_cbdata);
3181         return set;
3182 }
3183
3184 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3185                              ldlm_iterator_t replace, void *data)
3186 {
3187         struct ldlm_res_id res_id;
3188         struct obd_device *obd = class_exp2obd(exp);
3189
3190         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3191         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3192         return 0;
3193 }
3194
3195 /* find any ldlm lock of the inode in osc
3196  * return 0    not find
3197  *        1    find one
3198  *      < 0    error */
3199 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3200                            ldlm_iterator_t replace, void *data)
3201 {
3202         struct ldlm_res_id res_id;
3203         struct obd_device *obd = class_exp2obd(exp);
3204         int rc = 0;
3205
3206         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3207         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3208         if (rc == LDLM_ITER_STOP)
3209                 return(1);
3210         if (rc == LDLM_ITER_CONTINUE)
3211                 return(0);
3212         return(rc);
3213 }
3214
3215 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3216                             obd_enqueue_update_f upcall, void *cookie,
3217                             int *flags, int rc)
3218 {
3219         int intent = *flags & LDLM_FL_HAS_INTENT;
3220         ENTRY;
3221
3222         if (intent) {
3223                 /* The request was created before ldlm_cli_enqueue call. */
3224                 if (rc == ELDLM_LOCK_ABORTED) {
3225                         struct ldlm_reply *rep;
3226                         rep = req_capsule_server_get(&req->rq_pill,
3227                                                      &RMF_DLM_REP);
3228
3229                         LASSERT(rep != NULL);
3230                         if (rep->lock_policy_res1)
3231                                 rc = rep->lock_policy_res1;
3232                 }
3233         }
3234
3235         if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
3236                 *flags |= LDLM_FL_LVB_READY;
3237                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3238                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3239         }
3240
3241         /* Call the update callback. */
3242         rc = (*upcall)(cookie, rc);
3243         RETURN(rc);
3244 }
3245
3246 static int osc_enqueue_interpret(const struct lu_env *env,
3247                                  struct ptlrpc_request *req,
3248                                  struct osc_enqueue_args *aa, int rc)
3249 {
3250         struct ldlm_lock *lock;
3251         struct lustre_handle handle;
3252         __u32 mode;
3253
3254         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3255          * might be freed anytime after lock upcall has been called. */
3256         lustre_handle_copy(&handle, aa->oa_lockh);
3257         mode = aa->oa_ei->ei_mode;
3258
3259         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3260          * be valid. */
3261         lock = ldlm_handle2lock(&handle);
3262
3263         /* Take an additional reference so that a blocking AST that
3264          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3265          * to arrive after an upcall has been executed by
3266          * osc_enqueue_fini(). */
3267         ldlm_lock_addref(&handle, mode);
3268
3269         /* Let CP AST to grant the lock first. */
3270         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3271
3272         /* Complete obtaining the lock procedure. */
3273         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3274                                    mode, aa->oa_flags, aa->oa_lvb,
3275                                    sizeof(*aa->oa_lvb), &handle, rc);
3276         /* Complete osc stuff. */
3277         rc = osc_enqueue_fini(req, aa->oa_lvb,
3278                               aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
3279
3280         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3281
3282         /* Release the lock for async request. */
3283         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3284                 /*
3285                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3286                  * not already released by
3287                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3288                  */
3289                 ldlm_lock_decref(&handle, mode);
3290
3291         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3292                  aa->oa_lockh, req, aa);
3293         ldlm_lock_decref(&handle, mode);
3294         LDLM_LOCK_PUT(lock);
3295         return rc;
3296 }
3297
3298 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3299                         struct lov_oinfo *loi, int flags,
3300                         struct ost_lvb *lvb, __u32 mode, int rc)
3301 {
3302         if (rc == ELDLM_OK) {
3303                 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3304                 __u64 tmp;
3305
3306                 LASSERT(lock != NULL);
3307                 loi->loi_lvb = *lvb;
3308                 tmp = loi->loi_lvb.lvb_size;
3309                 /* Extend KMS up to the end of this lock and no further
3310                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3311                 if (tmp > lock->l_policy_data.l_extent.end)
3312                         tmp = lock->l_policy_data.l_extent.end + 1;
3313                 if (tmp >= loi->loi_kms) {
3314                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3315                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3316                         loi_kms_set(loi, tmp);
3317                 } else {
3318                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3319                                    LPU64"; leaving kms="LPU64", end="LPU64,
3320                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3321                                    lock->l_policy_data.l_extent.end);
3322                 }
3323                 ldlm_lock_allow_match(lock);
3324                 LDLM_LOCK_PUT(lock);
3325         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3326                 loi->loi_lvb = *lvb;
3327                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3328                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3329                 rc = ELDLM_OK;
3330         }
3331 }
3332 EXPORT_SYMBOL(osc_update_enqueue);
3333
3334 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3335
3336 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3337  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3338  * other synchronous requests, however keeping some locks and trying to obtain
3339  * others may take a considerable amount of time in a case of ost failure; and
3340  * when other sync requests do not get released lock from a client, the client
3341  * is excluded from the cluster -- such scenarious make the life difficult, so
3342  * release locks just after they are obtained. */
3343 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3344                      int *flags, ldlm_policy_data_t *policy,
3345                      struct ost_lvb *lvb, int kms_valid,
3346                      obd_enqueue_update_f upcall, void *cookie,
3347                      struct ldlm_enqueue_info *einfo,
3348                      struct lustre_handle *lockh,
3349                      struct ptlrpc_request_set *rqset, int async)
3350 {
3351         struct obd_device *obd = exp->exp_obd;
3352         struct ptlrpc_request *req = NULL;
3353         int intent = *flags & LDLM_FL_HAS_INTENT;
3354         ldlm_mode_t mode;
3355         int rc;
3356         ENTRY;
3357
3358         /* Filesystem lock extents are extended to page boundaries so that
3359          * dealing with the page cache is a little smoother.  */
3360         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3361         policy->l_extent.end |= ~CFS_PAGE_MASK;
3362
3363         /*
3364          * kms is not valid when either object is completely fresh (so that no
3365          * locks are cached), or object was evicted. In the latter case cached
3366          * lock cannot be used, because it would prime inode state with
3367          * potentially stale LVB.
3368          */
3369         if (!kms_valid)
3370                 goto no_match;
3371
3372         /* Next, search for already existing extent locks that will cover us */
3373         /* If we're trying to read, we also search for an existing PW lock.  The
3374          * VFS and page cache already protect us locally, so lots of readers/
3375          * writers can share a single PW lock.
3376          *
3377          * There are problems with conversion deadlocks, so instead of
3378          * converting a read lock to a write lock, we'll just enqueue a new
3379          * one.
3380          *
3381          * At some point we should cancel the read lock instead of making them
3382          * send us a blocking callback, but there are problems with canceling
3383          * locks out from other users right now, too. */
3384         mode = einfo->ei_mode;
3385         if (einfo->ei_mode == LCK_PR)
3386                 mode |= LCK_PW;
3387         mode = ldlm_lock_match(obd->obd_namespace,
3388                                *flags | LDLM_FL_LVB_READY, res_id,
3389                                einfo->ei_type, policy, mode, lockh, 0);
3390         if (mode) {
3391                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3392
3393                 if (osc_set_lock_data_with_check(matched, einfo)) {
3394                         /* addref the lock only if not async requests and PW
3395                          * lock is matched whereas we asked for PR. */
3396                         if (!rqset && einfo->ei_mode != mode)
3397                                 ldlm_lock_addref(lockh, LCK_PR);
3398                         if (intent) {
3399                                 /* I would like to be able to ASSERT here that
3400                                  * rss <= kms, but I can't, for reasons which
3401                                  * are explained in lov_enqueue() */
3402                         }
3403
3404                         /* We already have a lock, and it's referenced */
3405                         (*upcall)(cookie, ELDLM_OK);
3406
3407                         /* For async requests, decref the lock. */
3408                         if (einfo->ei_mode != mode)
3409                                 ldlm_lock_decref(lockh, LCK_PW);
3410                         else if (rqset)
3411                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3412                         LDLM_LOCK_PUT(matched);
3413                         RETURN(ELDLM_OK);
3414                 } else
3415                         ldlm_lock_decref(lockh, mode);
3416                 LDLM_LOCK_PUT(matched);
3417         }
3418
3419  no_match:
3420         if (intent) {
3421                 CFS_LIST_HEAD(cancels);
3422                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3423                                            &RQF_LDLM_ENQUEUE_LVB);
3424                 if (req == NULL)
3425                         RETURN(-ENOMEM);
3426
3427                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3428                 if (rc) {
3429                         ptlrpc_request_free(req);
3430                         RETURN(rc);
3431                 }
3432
3433                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3434                                      sizeof *lvb);
3435                 ptlrpc_request_set_replen(req);
3436         }
3437
3438         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3439         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3440
3441         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3442                               sizeof(*lvb), lockh, async);
3443         if (rqset) {
3444                 if (!rc) {
3445                         struct osc_enqueue_args *aa;
3446                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3447                         aa = ptlrpc_req_async_args(req);
3448                         aa->oa_ei = einfo;
3449                         aa->oa_exp = exp;
3450                         aa->oa_flags  = flags;
3451                         aa->oa_upcall = upcall;
3452                         aa->oa_cookie = cookie;
3453                         aa->oa_lvb    = lvb;
3454                         aa->oa_lockh  = lockh;
3455
3456                         req->rq_interpret_reply =
3457                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3458                         if (rqset == PTLRPCD_SET)
3459                                 ptlrpcd_add_req(req, PSCOPE_OTHER);
3460                         else
3461                                 ptlrpc_set_add_req(rqset, req);
3462                 } else if (intent) {
3463                         ptlrpc_req_finished(req);
3464                 }
3465                 RETURN(rc);
3466         }
3467
3468         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
3469         if (intent)
3470                 ptlrpc_req_finished(req);
3471
3472         RETURN(rc);
3473 }
3474
3475 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3476                        struct ldlm_enqueue_info *einfo,
3477                        struct ptlrpc_request_set *rqset)
3478 {
3479         struct ldlm_res_id res_id;
3480         int rc;
3481         ENTRY;
3482
3483         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3484                            oinfo->oi_md->lsm_object_seq, &res_id);
3485
3486         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3487                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3488                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3489                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3490                               rqset, rqset != NULL);
3491         RETURN(rc);
3492 }
3493
3494 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3495                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3496                    int *flags, void *data, struct lustre_handle *lockh,
3497                    int unref)
3498 {
3499         struct obd_device *obd = exp->exp_obd;
3500         int lflags = *flags;
3501         ldlm_mode_t rc;
3502         ENTRY;
3503
3504         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3505                 RETURN(-EIO);
3506
3507         /* Filesystem lock extents are extended to page boundaries so that
3508          * dealing with the page cache is a little smoother */
3509         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3510         policy->l_extent.end |= ~CFS_PAGE_MASK;
3511
3512         /* Next, search for already existing extent locks that will cover us */
3513         /* If we're trying to read, we also search for an existing PW lock.  The
3514          * VFS and page cache already protect us locally, so lots of readers/
3515          * writers can share a single PW lock. */
3516         rc = mode;
3517         if (mode == LCK_PR)
3518                 rc |= LCK_PW;
3519         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3520                              res_id, type, policy, rc, lockh, unref);
3521         if (rc) {
3522                 if (data != NULL) {
3523                         if (!osc_set_data_with_check(lockh, data)) {
3524                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3525                                         ldlm_lock_decref(lockh, rc);
3526                                 RETURN(0);
3527                         }
3528                 }
3529                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3530                         ldlm_lock_addref(lockh, LCK_PR);
3531                         ldlm_lock_decref(lockh, LCK_PW);
3532                 }
3533                 RETURN(rc);
3534         }
3535         RETURN(rc);
3536 }
3537
3538 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3539 {
3540         ENTRY;
3541
3542         if (unlikely(mode == LCK_GROUP))
3543                 ldlm_lock_decref_and_cancel(lockh, mode);
3544         else
3545                 ldlm_lock_decref(lockh, mode);
3546
3547         RETURN(0);
3548 }
3549
3550 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3551                       __u32 mode, struct lustre_handle *lockh)
3552 {
3553         ENTRY;
3554         RETURN(osc_cancel_base(lockh, mode));
3555 }
3556
3557 static int osc_cancel_unused(struct obd_export *exp,
3558                              struct lov_stripe_md *lsm,
3559                              ldlm_cancel_flags_t flags,
3560                              void *opaque)
3561 {
3562         struct obd_device *obd = class_exp2obd(exp);
3563         struct ldlm_res_id res_id, *resp = NULL;
3564
3565         if (lsm != NULL) {
3566                 resp = osc_build_res_name(lsm->lsm_object_id,
3567                                           lsm->lsm_object_seq, &res_id);
3568         }
3569
3570         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3571 }
3572
3573 static int osc_statfs_interpret(const struct lu_env *env,
3574                                 struct ptlrpc_request *req,
3575                                 struct osc_async_args *aa, int rc)
3576 {
3577         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3578         struct obd_statfs *msfs;
3579         __u64 used;
3580         ENTRY;
3581
3582         if (rc == -EBADR)
3583                 /* The request has in fact never been sent
3584                  * due to issues at a higher level (LOV).
3585                  * Exit immediately since the caller is
3586                  * aware of the problem and takes care
3587                  * of the clean up */
3588                  RETURN(rc);
3589
3590         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3591             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3592                 GOTO(out, rc = 0);
3593
3594         if (rc != 0)
3595                 GOTO(out, rc);
3596
3597         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3598         if (msfs == NULL) {
3599                 GOTO(out, rc = -EPROTO);
3600         }
3601
3602         /* Reinitialize the RDONLY and DEGRADED flags at the client
3603          * on each statfs, so they don't stay set permanently. */
3604         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3605
3606         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3607                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3608         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3609                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3610
3611         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3612                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3613         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3614                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3615
3616         /* Add a bit of hysteresis so this flag isn't continually flapping,
3617          * and ensure that new files don't get extremely fragmented due to
3618          * only a small amount of available space in the filesystem.
3619          * We want to set the NOSPC flag when there is less than ~0.1% free
3620          * and clear it when there is at least ~0.2% free space, so:
3621          *                   avail < ~0.1% max          max = avail + used
3622          *            1025 * avail < avail + used       used = blocks - free
3623          *            1024 * avail < used
3624          *            1024 * avail < blocks - free
3625          *                   avail < ((blocks - free) >> 10)
3626          *
3627          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3628          * lose that amount of space so in those cases we report no space left
3629          * if their is less than 1 GB left.                             */
3630         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3631         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3632                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3633                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3634         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3635                 (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
3636                         cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
3637
3638         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3639
3640         *aa->aa_oi->oi_osfs = *msfs;
3641 out:
3642         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3643         RETURN(rc);
3644 }
3645
3646 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3647                             __u64 max_age, struct ptlrpc_request_set *rqset)
3648 {
3649         struct ptlrpc_request *req;
3650         struct osc_async_args *aa;
3651         int                    rc;
3652         ENTRY;
3653
3654         /* We could possibly pass max_age in the request (as an absolute
3655          * timestamp or a "seconds.usec ago") so the target can avoid doing
3656          * extra calls into the filesystem if that isn't necessary (e.g.
3657          * during mount that would help a bit).  Having relative timestamps
3658          * is not so great if request processing is slow, while absolute
3659          * timestamps are not ideal because they need time synchronization. */
3660         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3661         if (req == NULL)
3662                 RETURN(-ENOMEM);
3663
3664         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3665         if (rc) {
3666                 ptlrpc_request_free(req);
3667                 RETURN(rc);
3668         }
3669         ptlrpc_request_set_replen(req);
3670         req->rq_request_portal = OST_CREATE_PORTAL;
3671         ptlrpc_at_set_req_timeout(req);
3672
3673         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3674                 /* procfs requests not want stat in wait for avoid deadlock */
3675                 req->rq_no_resend = 1;
3676                 req->rq_no_delay = 1;
3677         }
3678
3679         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3680         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3681         aa = ptlrpc_req_async_args(req);
3682         aa->aa_oi = oinfo;
3683
3684         ptlrpc_set_add_req(rqset, req);
3685         RETURN(0);
3686 }
3687
3688 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3689                       __u64 max_age, __u32 flags)
3690 {
3691         struct obd_statfs     *msfs;
3692         struct ptlrpc_request *req;
3693         struct obd_import     *imp = NULL;
3694         int rc;
3695         ENTRY;
3696
3697         /*Since the request might also come from lprocfs, so we need
3698          *sync this with client_disconnect_export Bug15684*/
3699         cfs_down_read(&obd->u.cli.cl_sem);
3700         if (obd->u.cli.cl_import)
3701                 imp = class_import_get(obd->u.cli.cl_import);
3702         cfs_up_read(&obd->u.cli.cl_sem);
3703         if (!imp)
3704                 RETURN(-ENODEV);
3705
3706         /* We could possibly pass max_age in the request (as an absolute
3707          * timestamp or a "seconds.usec ago") so the target can avoid doing
3708          * extra calls into the filesystem if that isn't necessary (e.g.
3709          * during mount that would help a bit).  Having relative timestamps
3710          * is not so great if request processing is slow, while absolute
3711          * timestamps are not ideal because they need time synchronization. */
3712         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3713
3714         class_import_put(imp);
3715
3716         if (req == NULL)
3717                 RETURN(-ENOMEM);
3718
3719         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3720         if (rc) {
3721                 ptlrpc_request_free(req);
3722                 RETURN(rc);
3723         }
3724         ptlrpc_request_set_replen(req);
3725         req->rq_request_portal = OST_CREATE_PORTAL;
3726         ptlrpc_at_set_req_timeout(req);
3727
3728         if (flags & OBD_STATFS_NODELAY) {
3729                 /* procfs requests not want stat in wait for avoid deadlock */
3730                 req->rq_no_resend = 1;
3731                 req->rq_no_delay = 1;
3732         }
3733
3734         rc = ptlrpc_queue_wait(req);
3735         if (rc)
3736                 GOTO(out, rc);
3737
3738         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3739         if (msfs == NULL) {
3740                 GOTO(out, rc = -EPROTO);
3741         }
3742
3743         *osfs = *msfs;
3744
3745         EXIT;
3746  out:
3747         ptlrpc_req_finished(req);
3748         return rc;
3749 }
3750
3751 /* Retrieve object striping information.
3752  *
3753  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3754  * the maximum number of OST indices which will fit in the user buffer.
3755  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3756  */
3757 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3758 {
3759         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3760         struct lov_user_md_v3 lum, *lumk;
3761         struct lov_user_ost_data_v1 *lmm_objects;
3762         int rc = 0, lum_size;
3763         ENTRY;
3764
3765         if (!lsm)
3766                 RETURN(-ENODATA);
3767
3768         /* we only need the header part from user space to get lmm_magic and
3769          * lmm_stripe_count, (the header part is common to v1 and v3) */
3770         lum_size = sizeof(struct lov_user_md_v1);
3771         if (cfs_copy_from_user(&lum, lump, lum_size))
3772                 RETURN(-EFAULT);
3773
3774         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3775             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3776                 RETURN(-EINVAL);
3777
3778         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3779         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3780         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3781         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3782
3783         /* we can use lov_mds_md_size() to compute lum_size
3784          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3785         if (lum.lmm_stripe_count > 0) {
3786                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3787                 OBD_ALLOC(lumk, lum_size);
3788                 if (!lumk)
3789                         RETURN(-ENOMEM);
3790
3791                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3792                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3793                 else
3794                         lmm_objects = &(lumk->lmm_objects[0]);
3795                 lmm_objects->l_object_id = lsm->lsm_object_id;
3796         } else {
3797                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3798                 lumk = &lum;
3799         }
3800
3801         lumk->lmm_object_id = lsm->lsm_object_id;
3802         lumk->lmm_object_seq = lsm->lsm_object_seq;
3803         lumk->lmm_stripe_count = 1;
3804
3805         if (cfs_copy_to_user(lump, lumk, lum_size))
3806                 rc = -EFAULT;
3807
3808         if (lumk != &lum)
3809                 OBD_FREE(lumk, lum_size);
3810
3811         RETURN(rc);
3812 }
3813
3814
3815 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3816                          void *karg, void *uarg)
3817 {
3818         struct obd_device *obd = exp->exp_obd;
3819         struct obd_ioctl_data *data = karg;
3820         int err = 0;
3821         ENTRY;
3822
3823         if (!cfs_try_module_get(THIS_MODULE)) {
3824                 CERROR("Can't get module. Is it alive?");
3825                 return -EINVAL;
3826         }
3827         switch (cmd) {
3828         case OBD_IOC_LOV_GET_CONFIG: {
3829                 char *buf;
3830                 struct lov_desc *desc;
3831                 struct obd_uuid uuid;
3832
3833                 buf = NULL;
3834                 len = 0;
3835                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3836                         GOTO(out, err = -EINVAL);
3837
3838                 data = (struct obd_ioctl_data *)buf;
3839
3840                 if (sizeof(*desc) > data->ioc_inllen1) {
3841                         obd_ioctl_freedata(buf, len);
3842                         GOTO(out, err = -EINVAL);
3843                 }
3844
3845                 if (data->ioc_inllen2 < sizeof(uuid)) {
3846                         obd_ioctl_freedata(buf, len);
3847                         GOTO(out, err = -EINVAL);
3848                 }
3849
3850                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3851                 desc->ld_tgt_count = 1;
3852                 desc->ld_active_tgt_count = 1;
3853                 desc->ld_default_stripe_count = 1;
3854                 desc->ld_default_stripe_size = 0;
3855                 desc->ld_default_stripe_offset = 0;
3856                 desc->ld_pattern = 0;
3857                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3858
3859                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3860
3861                 err = cfs_copy_to_user((void *)uarg, buf, len);
3862                 if (err)
3863                         err = -EFAULT;
3864                 obd_ioctl_freedata(buf, len);
3865                 GOTO(out, err);
3866         }
3867         case LL_IOC_LOV_SETSTRIPE:
3868                 err = obd_alloc_memmd(exp, karg);
3869                 if (err > 0)
3870                         err = 0;
3871                 GOTO(out, err);
3872         case LL_IOC_LOV_GETSTRIPE:
3873                 err = osc_getstripe(karg, uarg);
3874                 GOTO(out, err);
3875         case OBD_IOC_CLIENT_RECOVER:
3876                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3877                                             data->ioc_inlbuf1);
3878                 if (err > 0)
3879                         err = 0;
3880                 GOTO(out, err);
3881         case IOC_OSC_SET_ACTIVE:
3882                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3883                                                data->ioc_offset);
3884                 GOTO(out, err);
3885         case OBD_IOC_POLL_QUOTACHECK:
3886                 err = lquota_poll_check(quota_interface, exp,
3887                                         (struct if_quotacheck *)karg);
3888                 GOTO(out, err);
3889         case OBD_IOC_PING_TARGET:
3890                 err = ptlrpc_obd_ping(obd);
3891                 GOTO(out, err);
3892         default:
3893                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3894                        cmd, cfs_curproc_comm());
3895                 GOTO(out, err = -ENOTTY);
3896         }
3897 out:
3898         cfs_module_put(THIS_MODULE);
3899         return err;
3900 }
3901
3902 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3903                         void *key, __u32 *vallen, void *val,
3904                         struct lov_stripe_md *lsm)
3905 {
3906         ENTRY;
3907         if (!vallen || !val)
3908                 RETURN(-EFAULT);
3909
3910         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3911                 __u32 *stripe = val;
3912                 *vallen = sizeof(*stripe);
3913                 *stripe = 0;
3914                 RETURN(0);
3915         } else if (KEY_IS(KEY_LAST_ID)) {
3916                 struct ptlrpc_request *req;
3917                 obd_id                *reply;
3918                 char                  *tmp;
3919                 int                    rc;
3920
3921                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3922                                            &RQF_OST_GET_INFO_LAST_ID);
3923                 if (req == NULL)
3924                         RETURN(-ENOMEM);
3925
3926                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3927                                      RCL_CLIENT, keylen);
3928                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3929                 if (rc) {
3930                         ptlrpc_request_free(req);
3931                         RETURN(rc);
3932                 }
3933
3934                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3935                 memcpy(tmp, key, keylen);
3936
3937                 req->rq_no_delay = req->rq_no_resend = 1;
3938                 ptlrpc_request_set_replen(req);
3939                 rc = ptlrpc_queue_wait(req);
3940                 if (rc)
3941                         GOTO(out, rc);
3942
3943                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3944                 if (reply == NULL)
3945                         GOTO(out, rc = -EPROTO);
3946
3947                 *((obd_id *)val) = *reply;
3948         out:
3949                 ptlrpc_req_finished(req);
3950                 RETURN(rc);
3951         } else if (KEY_IS(KEY_FIEMAP)) {
3952                 struct ptlrpc_request *req;
3953                 struct ll_user_fiemap *reply;
3954                 char *tmp;
3955                 int rc;
3956
3957                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3958                                            &RQF_OST_GET_INFO_FIEMAP);
3959                 if (req == NULL)
3960                         RETURN(-ENOMEM);
3961
3962                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3963                                      RCL_CLIENT, keylen);
3964                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3965                                      RCL_CLIENT, *vallen);
3966                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3967                                      RCL_SERVER, *vallen);
3968
3969                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3970                 if (rc) {
3971                         ptlrpc_request_free(req);
3972                         RETURN(rc);
3973                 }
3974
3975                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3976                 memcpy(tmp, key, keylen);
3977                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3978                 memcpy(tmp, val, *vallen);
3979
3980                 ptlrpc_request_set_replen(req);
3981                 rc = ptlrpc_queue_wait(req);
3982                 if (rc)
3983                         GOTO(out1, rc);
3984
3985                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3986                 if (reply == NULL)
3987                         GOTO(out1, rc = -EPROTO);
3988
3989                 memcpy(val, reply, *vallen);
3990         out1:
3991                 ptlrpc_req_finished(req);
3992
3993                 RETURN(rc);
3994         }
3995
3996         RETURN(-EINVAL);
3997 }
3998
3999 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4000 {
4001         struct llog_ctxt *ctxt;
4002         int rc = 0;
4003         ENTRY;
4004
4005         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4006         if (ctxt) {
4007                 rc = llog_initiator_connect(ctxt);
4008                 llog_ctxt_put(ctxt);
4009         } else {
4010                 /* XXX return an error? skip setting below flags? */
4011         }
4012
4013         cfs_spin_lock(&imp->imp_lock);
4014         imp->imp_server_timeout = 1;
4015         imp->imp_pingable = 1;
4016         cfs_spin_unlock(&imp->imp_lock);
4017         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4018
4019         RETURN(rc);
4020 }
4021
4022 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4023                                           struct ptlrpc_request *req,
4024                                           void *aa, int rc)
4025 {
4026         ENTRY;
4027         if (rc != 0)
4028                 RETURN(rc);
4029
4030         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4031 }
4032
4033 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4034                               void *key, obd_count vallen, void *val,
4035                               struct ptlrpc_request_set *set)
4036 {
4037         struct ptlrpc_request *req;
4038         struct obd_device     *obd = exp->exp_obd;
4039         struct obd_import     *imp = class_exp2cliimp(exp);
4040         char                  *tmp;
4041         int                    rc;
4042         ENTRY;
4043
4044         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4045
4046         if (KEY_IS(KEY_NEXT_ID)) {
4047                 obd_id new_val;
4048                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4049
4050                 if (vallen != sizeof(obd_id))
4051                         RETURN(-ERANGE);
4052                 if (val == NULL)
4053                         RETURN(-EINVAL);
4054
4055                 if (vallen != sizeof(obd_id))
4056                         RETURN(-EINVAL);
4057
4058                 /* avoid race between allocate new object and set next id
4059                  * from ll_sync thread */
4060                 cfs_spin_lock(&oscc->oscc_lock);
4061                 new_val = *((obd_id*)val) + 1;
4062                 if (new_val > oscc->oscc_next_id)
4063                         oscc->oscc_next_id = new_val;
4064                 cfs_spin_unlock(&oscc->oscc_lock);
4065                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4066                        exp->exp_obd->obd_name,
4067                        obd->u.cli.cl_oscc.oscc_next_id);
4068
4069                 RETURN(0);
4070         }
4071
4072         if (KEY_IS(KEY_CHECKSUM)) {
4073                 if (vallen != sizeof(int))
4074                         RETURN(-EINVAL);
4075                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4076                 RETURN(0);
4077         }
4078
4079         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4080                 sptlrpc_conf_client_adapt(obd);
4081                 RETURN(0);
4082         }
4083
4084         if (KEY_IS(KEY_FLUSH_CTX)) {
4085                 sptlrpc_import_flush_my_ctx(imp);
4086                 RETURN(0);
4087         }
4088
4089         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4090                 RETURN(-EINVAL);
4091
4092         /* We pass all other commands directly to OST. Since nobody calls osc
4093            methods directly and everybody is supposed to go through LOV, we
4094            assume lov checked invalid values for us.
4095            The only recognised values so far are evict_by_nid and mds_conn.
4096            Even if something bad goes through, we'd get a -EINVAL from OST
4097            anyway. */
4098
4099         if (KEY_IS(KEY_GRANT_SHRINK))
4100                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4101         else
4102                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4103
4104         if (req == NULL)
4105                 RETURN(-ENOMEM);
4106
4107         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4108                              RCL_CLIENT, keylen);
4109         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4110                              RCL_CLIENT, vallen);
4111         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4112         if (rc) {
4113                 ptlrpc_request_free(req);
4114                 RETURN(rc);
4115         }
4116
4117         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4118         memcpy(tmp, key, keylen);
4119         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4120         memcpy(tmp, val, vallen);
4121
4122         if (KEY_IS(KEY_MDS_CONN)) {
4123                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4124
4125                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4126                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4127                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4128                 req->rq_no_delay = req->rq_no_resend = 1;
4129                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4130         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4131                 struct osc_grant_args *aa;
4132                 struct obdo *oa;
4133
4134                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4135                 aa = ptlrpc_req_async_args(req);
4136                 OBDO_ALLOC(oa);
4137                 if (!oa) {
4138                         ptlrpc_req_finished(req);
4139                         RETURN(-ENOMEM);
4140                 }
4141                 *oa = ((struct ost_body *)val)->oa;
4142                 aa->aa_oa = oa;
4143                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4144         }
4145
4146         ptlrpc_request_set_replen(req);
4147         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4148                 LASSERT(set != NULL);
4149                 ptlrpc_set_add_req(set, req);
4150                 ptlrpc_check_set(NULL, set);
4151         } else
4152                 ptlrpcd_add_req(req, PSCOPE_OTHER);
4153
4154         RETURN(0);
4155 }
4156
4157
4158 static struct llog_operations osc_size_repl_logops = {
4159         lop_cancel: llog_obd_repl_cancel
4160 };
4161
4162 static struct llog_operations osc_mds_ost_orig_logops;
4163
4164 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4165                            struct obd_device *tgt, struct llog_catid *catid)
4166 {
4167         int rc;
4168         ENTRY;
4169
4170         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4171                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4172         if (rc) {
4173                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4174                 GOTO(out, rc);
4175         }
4176
4177         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4178                         NULL, &osc_size_repl_logops);
4179         if (rc) {
4180                 struct llog_ctxt *ctxt =
4181                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4182                 if (ctxt)
4183                         llog_cleanup(ctxt);
4184                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4185         }
4186         GOTO(out, rc);
4187 out:
4188         if (rc) {
4189                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4190                        obd->obd_name, tgt->obd_name, catid, rc);
4191                 CERROR("logid "LPX64":0x%x\n",
4192                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4193         }
4194         return rc;
4195 }
4196
4197 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4198                          struct obd_device *disk_obd, int *index)
4199 {
4200         struct llog_catid catid;
4201         static char name[32] = CATLIST;
4202         int rc;
4203         ENTRY;
4204
4205         LASSERT(olg == &obd->obd_olg);
4206
4207         cfs_mutex_down(&olg->olg_cat_processing);
4208         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4209         if (rc) {
4210                 CERROR("rc: %d\n", rc);
4211                 GOTO(out, rc);
4212         }
4213
4214         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4215                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4216                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4217
4218         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4219         if (rc) {
4220                 CERROR("rc: %d\n", rc);
4221                 GOTO(out, rc);
4222         }
4223
4224         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4225         if (rc) {
4226                 CERROR("rc: %d\n", rc);
4227                 GOTO(out, rc);
4228         }
4229
4230  out:
4231         cfs_mutex_up(&olg->olg_cat_processing);
4232
4233         return rc;
4234 }
4235
4236 static int osc_llog_finish(struct obd_device *obd, int count)
4237 {
4238         struct llog_ctxt *ctxt;
4239         int rc = 0, rc2 = 0;
4240         ENTRY;
4241
4242         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4243         if (ctxt)
4244                 rc = llog_cleanup(ctxt);
4245
4246         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4247         if (ctxt)
4248                 rc2 = llog_cleanup(ctxt);
4249         if (!rc)
4250                 rc = rc2;
4251
4252         RETURN(rc);
4253 }
4254
4255 static int osc_reconnect(const struct lu_env *env,
4256                          struct obd_export *exp, struct obd_device *obd,
4257                          struct obd_uuid *cluuid,
4258                          struct obd_connect_data *data,
4259                          void *localdata)
4260 {
4261         struct client_obd *cli = &obd->u.cli;
4262
4263         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4264                 long lost_grant;
4265
4266                 client_obd_list_lock(&cli->cl_loi_list_lock);
4267                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4268                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4269                 lost_grant = cli->cl_lost_grant;
4270                 cli->cl_lost_grant = 0;
4271                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4272
4273                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4274                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4275                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4276                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4277                        " ocd_grant: %d\n", data->ocd_connect_flags,
4278                        data->ocd_version, data->ocd_grant);
4279         }
4280
4281         RETURN(0);
4282 }
4283
4284 static int osc_disconnect(struct obd_export *exp)
4285 {
4286         struct obd_device *obd = class_exp2obd(exp);
4287         struct llog_ctxt  *ctxt;
4288         int rc;
4289
4290         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4291         if (ctxt) {
4292                 if (obd->u.cli.cl_conn_count == 1) {
4293                         /* Flush any remaining cancel messages out to the
4294                          * target */
4295                         llog_sync(ctxt, exp);
4296                 }
4297                 llog_ctxt_put(ctxt);
4298         } else {
4299                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4300                        obd);
4301         }
4302
4303         rc = client_disconnect_export(exp);
4304         /**
4305          * Initially we put del_shrink_grant before disconnect_export, but it
4306          * causes the following problem if setup (connect) and cleanup
4307          * (disconnect) are tangled together.
4308          *      connect p1                     disconnect p2
4309          *   ptlrpc_connect_import
4310          *     ...............               class_manual_cleanup
4311          *                                     osc_disconnect
4312          *                                     del_shrink_grant
4313          *   ptlrpc_connect_interrupt
4314          *     init_grant_shrink
4315          *   add this client to shrink list
4316          *                                      cleanup_osc
4317          * Bang! pinger trigger the shrink.
4318          * So the osc should be disconnected from the shrink list, after we
4319          * are sure the import has been destroyed. BUG18662
4320          */
4321         if (obd->u.cli.cl_import == NULL)
4322                 osc_del_shrink_grant(&obd->u.cli);
4323         return rc;
4324 }
4325
4326 static int osc_import_event(struct obd_device *obd,
4327                             struct obd_import *imp,
4328                             enum obd_import_event event)
4329 {
4330         struct client_obd *cli;
4331         int rc = 0;
4332
4333         ENTRY;
4334         LASSERT(imp->imp_obd == obd);
4335
4336         switch (event) {
4337         case IMP_EVENT_DISCON: {
4338                 /* Only do this on the MDS OSC's */
4339                 if (imp->imp_server_timeout) {
4340                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4341
4342                         cfs_spin_lock(&oscc->oscc_lock);
4343                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4344                         cfs_spin_unlock(&oscc->oscc_lock);
4345                 }
4346                 cli = &obd->u.cli;
4347                 client_obd_list_lock(&cli->cl_loi_list_lock);
4348                 cli->cl_avail_grant = 0;
4349                 cli->cl_lost_grant = 0;
4350                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4351                 break;
4352         }
4353         case IMP_EVENT_INACTIVE: {
4354                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4355                 break;
4356         }
4357         case IMP_EVENT_INVALIDATE: {
4358                 struct ldlm_namespace *ns = obd->obd_namespace;
4359                 struct lu_env         *env;
4360                 int                    refcheck;
4361
4362                 env = cl_env_get(&refcheck);
4363                 if (!IS_ERR(env)) {
4364                         /* Reset grants */
4365                         cli = &obd->u.cli;
4366                         client_obd_list_lock(&cli->cl_loi_list_lock);
4367                         /* all pages go to failing rpcs due to the invalid
4368                          * import */
4369                         osc_check_rpcs(env, cli);
4370                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4371
4372                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4373                         cl_env_put(env, &refcheck);
4374                 } else
4375                         rc = PTR_ERR(env);
4376                 break;
4377         }
4378         case IMP_EVENT_ACTIVE: {
4379                 /* Only do this on the MDS OSC's */
4380                 if (imp->imp_server_timeout) {
4381                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4382
4383                         cfs_spin_lock(&oscc->oscc_lock);
4384                         oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
4385                         cfs_spin_unlock(&oscc->oscc_lock);
4386                 }
4387                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4388                 break;
4389         }
4390         case IMP_EVENT_OCD: {
4391                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4392
4393                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4394                         osc_init_grant(&obd->u.cli, ocd);
4395
4396                 /* See bug 7198 */
4397                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4398                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4399
4400                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4401                 break;
4402         }
4403         case IMP_EVENT_DEACTIVATE: {
4404                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4405                 break;
4406         }
4407         case IMP_EVENT_ACTIVATE: {
4408                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4409                 break;
4410         }
4411         default:
4412                 CERROR("Unknown import event %d\n", event);
4413                 LBUG();
4414         }
4415         RETURN(rc);
4416 }
4417
4418 /**
4419  * Determine whether the lock can be canceled before replaying the lock
4420  * during recovery, see bug16774 for detailed information.
4421  *
4422  * \retval zero the lock can't be canceled
4423  * \retval other ok to cancel
4424  */
4425 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4426 {
4427         check_res_locked(lock->l_resource);
4428
4429         /*
4430          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4431          *
4432          * XXX as a future improvement, we can also cancel unused write lock
4433          * if it doesn't have dirty data and active mmaps.
4434          */
4435         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4436             (lock->l_granted_mode == LCK_PR ||
4437              lock->l_granted_mode == LCK_CR) &&
4438             (osc_dlm_lock_pageref(lock) == 0))
4439                 RETURN(1);
4440
4441         RETURN(0);
4442 }
4443
4444 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4445 {
4446         int rc;
4447         ENTRY;
4448
4449         ENTRY;
4450         rc = ptlrpcd_addref();
4451         if (rc)
4452                 RETURN(rc);
4453
4454         rc = client_obd_setup(obd, lcfg);
4455         if (rc) {
4456                 ptlrpcd_decref();
4457         } else {
4458                 struct lprocfs_static_vars lvars = { 0 };
4459                 struct client_obd *cli = &obd->u.cli;
4460
4461                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4462                 lprocfs_osc_init_vars(&lvars);
4463                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4464                         lproc_osc_attach_seqstat(obd);
4465                         sptlrpc_lprocfs_cliobd_attach(obd);
4466                         ptlrpc_lprocfs_register_obd(obd);
4467                 }
4468
4469                 oscc_init(obd);
4470                 /* We need to allocate a few requests more, because
4471                    brw_interpret tries to create new requests before freeing
4472                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4473                    reserved, but I afraid that might be too much wasted RAM
4474                    in fact, so 2 is just my guess and still should work. */
4475                 cli->cl_import->imp_rq_pool =
4476                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4477                                             OST_MAXREQSIZE,
4478                                             ptlrpc_add_rqs_to_pool);
4479
4480                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4481                 cfs_sema_init(&cli->cl_grant_sem, 1);
4482
4483                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4484         }
4485
4486         RETURN(rc);
4487 }
4488
4489 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4490 {
4491         int rc = 0;
4492         ENTRY;
4493
4494         switch (stage) {
4495         case OBD_CLEANUP_EARLY: {
4496                 struct obd_import *imp;
4497                 imp = obd->u.cli.cl_import;
4498                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4499                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4500                 ptlrpc_deactivate_import(imp);
4501                 cfs_spin_lock(&imp->imp_lock);
4502                 imp->imp_pingable = 0;
4503                 cfs_spin_unlock(&imp->imp_lock);
4504                 break;
4505         }
4506         case OBD_CLEANUP_EXPORTS: {
4507                 /* LU-464
4508                  * for echo client, export may be on zombie list, wait for
4509                  * zombie thread to cull it, because cli.cl_import will be
4510                  * cleared in client_disconnect_export():
4511                  *   class_export_destroy() -> obd_cleanup() ->
4512                  *   echo_device_free() -> echo_client_cleanup() ->
4513                  *   obd_disconnect() -> osc_disconnect() ->
4514                  *   client_disconnect_export()
4515                  */
4516                 obd_zombie_barrier();
4517                 obd_cleanup_client_import(obd);
4518                 rc = obd_llog_finish(obd, 0);
4519                 if (rc != 0)
4520                         CERROR("failed to cleanup llogging subsystems\n");
4521                 break;
4522                 }
4523         }
4524         RETURN(rc);
4525 }
4526
4527 int osc_cleanup(struct obd_device *obd)
4528 {
4529         int rc;
4530
4531         ENTRY;
4532         ptlrpc_lprocfs_unregister_obd(obd);
4533         lprocfs_obd_cleanup(obd);
4534
4535         /* free memory of osc quota cache */
4536         lquota_cleanup(quota_interface, obd);
4537
4538         rc = client_obd_cleanup(obd);
4539
4540         ptlrpcd_decref();
4541         RETURN(rc);
4542 }
4543
4544 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4545 {
4546         struct lprocfs_static_vars lvars = { 0 };
4547         int rc = 0;
4548
4549         lprocfs_osc_init_vars(&lvars);
4550
4551         switch (lcfg->lcfg_command) {
4552         default:
4553                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4554                                               lcfg, obd);
4555                 if (rc > 0)
4556                         rc = 0;
4557                 break;
4558         }
4559
4560         return(rc);
4561 }
4562
4563 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4564 {
4565         return osc_process_config_base(obd, buf);
4566 }
4567
4568 struct obd_ops osc_obd_ops = {
4569         .o_owner                = THIS_MODULE,
4570         .o_setup                = osc_setup,
4571         .o_precleanup           = osc_precleanup,
4572         .o_cleanup              = osc_cleanup,
4573         .o_add_conn             = client_import_add_conn,
4574         .o_del_conn             = client_import_del_conn,
4575         .o_connect              = client_connect_import,
4576         .o_reconnect            = osc_reconnect,
4577         .o_disconnect           = osc_disconnect,
4578         .o_statfs               = osc_statfs,
4579         .o_statfs_async         = osc_statfs_async,
4580         .o_packmd               = osc_packmd,
4581         .o_unpackmd             = osc_unpackmd,
4582         .o_precreate            = osc_precreate,
4583         .o_create               = osc_create,
4584         .o_create_async         = osc_create_async,
4585         .o_destroy              = osc_destroy,
4586         .o_getattr              = osc_getattr,
4587         .o_getattr_async        = osc_getattr_async,
4588         .o_setattr              = osc_setattr,
4589         .o_setattr_async        = osc_setattr_async,
4590         .o_brw                  = osc_brw,
4591         .o_punch                = osc_punch,
4592         .o_sync                 = osc_sync,
4593         .o_enqueue              = osc_enqueue,
4594         .o_change_cbdata        = osc_change_cbdata,
4595         .o_find_cbdata          = osc_find_cbdata,
4596         .o_cancel               = osc_cancel,
4597         .o_cancel_unused        = osc_cancel_unused,
4598         .o_iocontrol            = osc_iocontrol,
4599         .o_get_info             = osc_get_info,
4600         .o_set_info_async       = osc_set_info_async,
4601         .o_import_event         = osc_import_event,
4602         .o_llog_init            = osc_llog_init,
4603         .o_llog_finish          = osc_llog_finish,
4604         .o_process_config       = osc_process_config,
4605 };
4606
4607 extern struct lu_kmem_descr osc_caches[];
4608 extern cfs_spinlock_t       osc_ast_guard;
4609 extern cfs_lock_class_key_t osc_ast_guard_class;
4610
4611 int __init osc_init(void)
4612 {
4613         struct lprocfs_static_vars lvars = { 0 };
4614         int rc;
4615         ENTRY;
4616
4617         /* print an address of _any_ initialized kernel symbol from this
4618          * module, to allow debugging with gdb that doesn't support data
4619          * symbols from modules.*/
4620         CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
4621
4622         rc = lu_kmem_init(osc_caches);
4623
4624         lprocfs_osc_init_vars(&lvars);
4625
4626         cfs_request_module("lquota");
4627         quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
4628         lquota_init(quota_interface);
4629         init_obd_quota_ops(quota_interface, &osc_obd_ops);
4630
4631         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4632                                  LUSTRE_OSC_NAME, &osc_device_type);
4633         if (rc) {
4634                 if (quota_interface)
4635                         PORTAL_SYMBOL_PUT(osc_quota_interface);
4636                 lu_kmem_fini(osc_caches);
4637                 RETURN(rc);
4638         }
4639
4640         cfs_spin_lock_init(&osc_ast_guard);
4641         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4642
4643         osc_mds_ost_orig_logops = llog_lvfs_ops;
4644         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4645         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4646         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4647         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4648
4649         RETURN(rc);
4650 }
4651
4652 #ifdef __KERNEL__
4653 static void /*__exit*/ osc_exit(void)
4654 {
4655         lu_device_type_fini(&osc_device_type);
4656
4657         lquota_exit(quota_interface);
4658         if (quota_interface)
4659                 PORTAL_SYMBOL_PUT(osc_quota_interface);
4660
4661         class_unregister_type(LUSTRE_OSC_NAME);
4662         lu_kmem_fini(osc_caches);
4663 }
4664
4665 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4666 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4667 MODULE_LICENSE("GPL");
4668
4669 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4670 #endif