Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #ifndef EXPORT_SYMTAB
38 # define EXPORT_SYMTAB
39 #endif
40 #define DEBUG_SUBSYSTEM S_OSC
41
42 #include <libcfs/libcfs.h>
43
44 #ifndef __KERNEL__
45 # include <liblustre.h>
46 #endif
47
48 #include <lustre_dlm.h>
49 #include <lustre_net.h>
50 #include <lustre/lustre_user.h>
51 #include <obd_cksum.h>
52 #include <obd_ost.h>
53 #include <obd_lov.h>
54
55 #ifdef  __CYGWIN__
56 # include <ctype.h>
57 #endif
58
59 #include <lustre_ha.h>
60 #include <lprocfs_status.h>
61 #include <lustre_log.h>
62 #include <lustre_debug.h>
63 #include <lustre_param.h>
64 #include "osc_internal.h"
65
66 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
67 static int brw_interpret(const struct lu_env *env,
68                          struct ptlrpc_request *req, void *data, int rc);
69 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
70                             int ptlrpc);
71 int osc_cleanup(struct obd_device *obd);
72
73 /* Pack OSC object metadata for disk storage (LE byte order). */
74 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
75                       struct lov_stripe_md *lsm)
76 {
77         int lmm_size;
78         ENTRY;
79
80         lmm_size = sizeof(**lmmp);
81         if (!lmmp)
82                 RETURN(lmm_size);
83
84         if (*lmmp && !lsm) {
85                 OBD_FREE(*lmmp, lmm_size);
86                 *lmmp = NULL;
87                 RETURN(0);
88         }
89
90         if (!*lmmp) {
91                 OBD_ALLOC(*lmmp, lmm_size);
92                 if (!*lmmp)
93                         RETURN(-ENOMEM);
94         }
95
96         if (lsm) {
97                 LASSERT(lsm->lsm_object_id);
98                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
99                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
100                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
101         }
102
103         RETURN(lmm_size);
104 }
105
106 /* Unpack OSC object metadata from disk storage (LE byte order). */
107 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
108                         struct lov_mds_md *lmm, int lmm_bytes)
109 {
110         int lsm_size;
111         struct obd_import *imp = class_exp2cliimp(exp);
112         ENTRY;
113
114         if (lmm != NULL) {
115                 if (lmm_bytes < sizeof (*lmm)) {
116                         CERROR("lov_mds_md too small: %d, need %d\n",
117                                lmm_bytes, (int)sizeof(*lmm));
118                         RETURN(-EINVAL);
119                 }
120                 /* XXX LOV_MAGIC etc check? */
121
122                 if (lmm->lmm_object_id == 0) {
123                         CERROR("lov_mds_md: zero lmm_object_id\n");
124                         RETURN(-EINVAL);
125                 }
126         }
127
128         lsm_size = lov_stripe_md_size(1);
129         if (lsmp == NULL)
130                 RETURN(lsm_size);
131
132         if (*lsmp != NULL && lmm == NULL) {
133                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
134                 OBD_FREE(*lsmp, lsm_size);
135                 *lsmp = NULL;
136                 RETURN(0);
137         }
138
139         if (*lsmp == NULL) {
140                 OBD_ALLOC(*lsmp, lsm_size);
141                 if (*lsmp == NULL)
142                         RETURN(-ENOMEM);
143                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
144                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
145                         OBD_FREE(*lsmp, lsm_size);
146                         RETURN(-ENOMEM);
147                 }
148                 loi_init((*lsmp)->lsm_oinfo[0]);
149         }
150
151         if (lmm != NULL) {
152                 /* XXX zero *lsmp? */
153                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
154                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
155                 LASSERT((*lsmp)->lsm_object_id);
156                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
157         }
158
159         if (imp != NULL &&
160             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
161                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
162         else
163                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
164
165         RETURN(lsm_size);
166 }
167
168 static inline void osc_pack_capa(struct ptlrpc_request *req,
169                                  struct ost_body *body, void *capa)
170 {
171         struct obd_capa *oc = (struct obd_capa *)capa;
172         struct lustre_capa *c;
173
174         if (!capa)
175                 return;
176
177         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
178         LASSERT(c);
179         capa_cpy(c, oc);
180         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
181         DEBUG_CAPA(D_SEC, c, "pack");
182 }
183
184 static inline void osc_pack_req_body(struct ptlrpc_request *req,
185                                      struct obd_info *oinfo)
186 {
187         struct ost_body *body;
188
189         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
190         LASSERT(body);
191
192         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
193         osc_pack_capa(req, body, oinfo->oi_capa);
194 }
195
196 static inline void osc_set_capa_size(struct ptlrpc_request *req,
197                                      const struct req_msg_field *field,
198                                      struct obd_capa *oc)
199 {
200         if (oc == NULL)
201                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
202         else
203                 /* it is already calculated as sizeof struct obd_capa */
204                 ;
205 }
206
207 static int osc_getattr_interpret(const struct lu_env *env,
208                                  struct ptlrpc_request *req,
209                                  struct osc_async_args *aa, int rc)
210 {
211         struct ost_body *body;
212         ENTRY;
213
214         if (rc != 0)
215                 GOTO(out, rc);
216
217         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
218         if (body) {
219                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
220                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
221
222                 /* This should really be sent by the OST */
223                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
224                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
225         } else {
226                 CDEBUG(D_INFO, "can't unpack ost_body\n");
227                 rc = -EPROTO;
228                 aa->aa_oi->oi_oa->o_valid = 0;
229         }
230 out:
231         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
232         RETURN(rc);
233 }
234
235 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
236                              struct ptlrpc_request_set *set)
237 {
238         struct ptlrpc_request *req;
239         struct osc_async_args *aa;
240         int                    rc;
241         ENTRY;
242
243         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
244         if (req == NULL)
245                 RETURN(-ENOMEM);
246
247         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
248         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
249         if (rc) {
250                 ptlrpc_request_free(req);
251                 RETURN(rc);
252         }
253
254         osc_pack_req_body(req, oinfo);
255
256         ptlrpc_request_set_replen(req);
257         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
258
259         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
260         aa = ptlrpc_req_async_args(req);
261         aa->aa_oi = oinfo;
262
263         ptlrpc_set_add_req(set, req);
264         RETURN(0);
265 }
266
267 static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
268 {
269         struct ptlrpc_request *req;
270         struct ost_body       *body;
271         int                    rc;
272         ENTRY;
273
274         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
275         if (req == NULL)
276                 RETURN(-ENOMEM);
277
278         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
279         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
280         if (rc) {
281                 ptlrpc_request_free(req);
282                 RETURN(rc);
283         }
284
285         osc_pack_req_body(req, oinfo);
286
287         ptlrpc_request_set_replen(req);
288
289         rc = ptlrpc_queue_wait(req);
290         if (rc)
291                 GOTO(out, rc);
292
293         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
294         if (body == NULL)
295                 GOTO(out, rc = -EPROTO);
296
297         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
298         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
299
300         /* This should really be sent by the OST */
301         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
302         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
303
304         EXIT;
305  out:
306         ptlrpc_req_finished(req);
307         return rc;
308 }
309
310 static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
311                        struct obd_trans_info *oti)
312 {
313         struct ptlrpc_request *req;
314         struct ost_body       *body;
315         int                    rc;
316         ENTRY;
317
318         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
319
320         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
321         if (req == NULL)
322                 RETURN(-ENOMEM);
323
324         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
325         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 RETURN(rc);
329         }
330
331         osc_pack_req_body(req, oinfo);
332
333         ptlrpc_request_set_replen(req);
334
335         rc = ptlrpc_queue_wait(req);
336         if (rc)
337                 GOTO(out, rc);
338
339         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
340         if (body == NULL)
341                 GOTO(out, rc = -EPROTO);
342
343         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
344
345         EXIT;
346 out:
347         ptlrpc_req_finished(req);
348         RETURN(rc);
349 }
350
351 static int osc_setattr_interpret(const struct lu_env *env,
352                                  struct ptlrpc_request *req,
353                                  struct osc_setattr_args *sa, int rc)
354 {
355         struct ost_body *body;
356         ENTRY;
357
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
362         if (body == NULL)
363                 GOTO(out, rc = -EPROTO);
364
365         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
366 out:
367         rc = sa->sa_upcall(sa->sa_cookie, rc);
368         RETURN(rc);
369 }
370
371 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
372                            struct obd_trans_info *oti,
373                            obd_enqueue_update_f upcall, void *cookie,
374                            struct ptlrpc_request_set *rqset)
375 {
376         struct ptlrpc_request   *req;
377         struct osc_setattr_args *sa;
378         int                      rc;
379         ENTRY;
380
381         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
382         if (req == NULL)
383                 RETURN(-ENOMEM);
384
385         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
386         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
387         if (rc) {
388                 ptlrpc_request_free(req);
389                 RETURN(rc);
390         }
391
392         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
393                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
394
395         osc_pack_req_body(req, oinfo);
396
397         ptlrpc_request_set_replen(req);
398
399         /* do mds to ost setattr asynchronously */
400         if (!rqset) {
401                 /* Do not wait for response. */
402                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
403         } else {
404                 req->rq_interpret_reply =
405                         (ptlrpc_interpterer_t)osc_setattr_interpret;
406
407                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
408                 sa = ptlrpc_req_async_args(req);
409                 sa->sa_oa = oinfo->oi_oa;
410                 sa->sa_upcall = upcall;
411                 sa->sa_cookie = cookie;
412
413                 if (rqset == PTLRPCD_SET)
414                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
415                 else
416                         ptlrpc_set_add_req(rqset, req);
417         }
418
419         RETURN(0);
420 }
421
422 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
423                              struct obd_trans_info *oti,
424                              struct ptlrpc_request_set *rqset)
425 {
426         return osc_setattr_async_base(exp, oinfo, oti,
427                                       oinfo->oi_cb_up, oinfo, rqset);
428 }
429
430 int osc_real_create(struct obd_export *exp, struct obdo *oa,
431                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
432 {
433         struct ptlrpc_request *req;
434         struct ost_body       *body;
435         struct lov_stripe_md  *lsm;
436         int                    rc;
437         ENTRY;
438
439         LASSERT(oa);
440         LASSERT(ea);
441
442         lsm = *ea;
443         if (!lsm) {
444                 rc = obd_alloc_memmd(exp, &lsm);
445                 if (rc < 0)
446                         RETURN(rc);
447         }
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
450         if (req == NULL)
451                 GOTO(out, rc = -ENOMEM);
452
453         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
454         if (rc) {
455                 ptlrpc_request_free(req);
456                 GOTO(out, rc);
457         }
458
459         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
460         LASSERT(body);
461         lustre_set_wire_obdo(&body->oa, oa);
462
463         ptlrpc_request_set_replen(req);
464
465         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
466             oa->o_flags == OBD_FL_DELORPHAN) {
467                 DEBUG_REQ(D_HA, req,
468                           "delorphan from OST integration");
469                 /* Don't resend the delorphan req */
470                 req->rq_no_resend = req->rq_no_delay = 1;
471         }
472
473         rc = ptlrpc_queue_wait(req);
474         if (rc)
475                 GOTO(out_req, rc);
476
477         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
478         if (body == NULL)
479                 GOTO(out_req, rc = -EPROTO);
480
481         lustre_get_wire_obdo(oa, &body->oa);
482
483         /* This should really be sent by the OST */
484         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
485         oa->o_valid |= OBD_MD_FLBLKSZ;
486
487         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
488          * have valid lsm_oinfo data structs, so don't go touching that.
489          * This needs to be fixed in a big way.
490          */
491         lsm->lsm_object_id = oa->o_id;
492         lsm->lsm_object_seq = oa->o_seq;
493         *ea = lsm;
494
495         if (oti != NULL) {
496                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
497
498                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
499                         if (!oti->oti_logcookies)
500                                 oti_alloc_cookies(oti, 1);
501                         *oti->oti_logcookies = oa->o_lcookie;
502                 }
503         }
504
505         CDEBUG(D_HA, "transno: "LPD64"\n",
506                lustre_msg_get_transno(req->rq_repmsg));
507 out_req:
508         ptlrpc_req_finished(req);
509 out:
510         if (rc && !*ea)
511                 obd_free_memmd(exp, &lsm);
512         RETURN(rc);
513 }
514
515 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
516                    obd_enqueue_update_f upcall, void *cookie,
517                    struct ptlrpc_request_set *rqset)
518 {
519         struct ptlrpc_request   *req;
520         struct osc_setattr_args *sa;
521         struct ost_body         *body;
522         int                      rc;
523         ENTRY;
524
525         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
526         if (req == NULL)
527                 RETURN(-ENOMEM);
528
529         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
530         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
531         if (rc) {
532                 ptlrpc_request_free(req);
533                 RETURN(rc);
534         }
535         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
536         ptlrpc_at_set_req_timeout(req);
537
538         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
539         LASSERT(body);
540         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
541         osc_pack_capa(req, body, oinfo->oi_capa);
542
543         ptlrpc_request_set_replen(req);
544
545
546         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
547         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
548         sa = ptlrpc_req_async_args(req);
549         sa->sa_oa     = oinfo->oi_oa;
550         sa->sa_upcall = upcall;
551         sa->sa_cookie = cookie;
552         if (rqset == PTLRPCD_SET)
553                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
554         else
555                 ptlrpc_set_add_req(rqset, req);
556
557         RETURN(0);
558 }
559
560 static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
561                      struct obd_trans_info *oti,
562                      struct ptlrpc_request_set *rqset)
563 {
564         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
565         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
566         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
567         return osc_punch_base(exp, oinfo,
568                               oinfo->oi_cb_up, oinfo, rqset);
569 }
570
571 static int osc_sync_interpret(const struct lu_env *env,
572                               struct ptlrpc_request *req,
573                               void *arg, int rc)
574 {
575         struct osc_async_args *aa = arg;
576         struct ost_body *body;
577         ENTRY;
578
579         if (rc)
580                 GOTO(out, rc);
581
582         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
583         if (body == NULL) {
584                 CERROR ("can't unpack ost_body\n");
585                 GOTO(out, rc = -EPROTO);
586         }
587
588         *aa->aa_oi->oi_oa = body->oa;
589 out:
590         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
591         RETURN(rc);
592 }
593
594 static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
595                     obd_size start, obd_size end,
596                     struct ptlrpc_request_set *set)
597 {
598         struct ptlrpc_request *req;
599         struct ost_body       *body;
600         struct osc_async_args *aa;
601         int                    rc;
602         ENTRY;
603
604         if (!oinfo->oi_oa) {
605                 CDEBUG(D_INFO, "oa NULL\n");
606                 RETURN(-EINVAL);
607         }
608
609         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
610         if (req == NULL)
611                 RETURN(-ENOMEM);
612
613         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
614         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
615         if (rc) {
616                 ptlrpc_request_free(req);
617                 RETURN(rc);
618         }
619
620         /* overload the size and blocks fields in the oa with start/end */
621         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
622         LASSERT(body);
623         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
624         body->oa.o_size = start;
625         body->oa.o_blocks = end;
626         body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
627         osc_pack_capa(req, body, oinfo->oi_capa);
628
629         ptlrpc_request_set_replen(req);
630         req->rq_interpret_reply = osc_sync_interpret;
631
632         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
633         aa = ptlrpc_req_async_args(req);
634         aa->aa_oi = oinfo;
635
636         ptlrpc_set_add_req(set, req);
637         RETURN (0);
638 }
639
640 /* Find and cancel locally locks matched by @mode in the resource found by
641  * @objid. Found locks are added into @cancel list. Returns the amount of
642  * locks added to @cancels list. */
643 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
644                                    cfs_list_t *cancels,
645                                    ldlm_mode_t mode, int lock_flags)
646 {
647         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
648         struct ldlm_res_id res_id;
649         struct ldlm_resource *res;
650         int count;
651         ENTRY;
652
653         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
654         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
655         if (res == NULL)
656                 RETURN(0);
657
658         LDLM_RESOURCE_ADDREF(res);
659         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
660                                            lock_flags, 0, NULL);
661         LDLM_RESOURCE_DELREF(res);
662         ldlm_resource_putref(res);
663         RETURN(count);
664 }
665
666 static int osc_destroy_interpret(const struct lu_env *env,
667                                  struct ptlrpc_request *req, void *data,
668                                  int rc)
669 {
670         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
671
672         cfs_atomic_dec(&cli->cl_destroy_in_flight);
673         cfs_waitq_signal(&cli->cl_destroy_waitq);
674         return 0;
675 }
676
677 static int osc_can_send_destroy(struct client_obd *cli)
678 {
679         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
680             cli->cl_max_rpcs_in_flight) {
681                 /* The destroy request can be sent */
682                 return 1;
683         }
684         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
685             cli->cl_max_rpcs_in_flight) {
686                 /*
687                  * The counter has been modified between the two atomic
688                  * operations.
689                  */
690                 cfs_waitq_signal(&cli->cl_destroy_waitq);
691         }
692         return 0;
693 }
694
695 /* Destroy requests can be async always on the client, and we don't even really
696  * care about the return code since the client cannot do anything at all about
697  * a destroy failure.
698  * When the MDS is unlinking a filename, it saves the file objects into a
699  * recovery llog, and these object records are cancelled when the OST reports
700  * they were destroyed and sync'd to disk (i.e. transaction committed).
701  * If the client dies, or the OST is down when the object should be destroyed,
702  * the records are not cancelled, and when the OST reconnects to the MDS next,
703  * it will retrieve the llog unlink logs and then sends the log cancellation
704  * cookies to the MDS after committing destroy transactions. */
705 static int osc_destroy(struct obd_export *exp, struct obdo *oa,
706                        struct lov_stripe_md *ea, struct obd_trans_info *oti,
707                        struct obd_export *md_export, void *capa)
708 {
709         struct client_obd     *cli = &exp->exp_obd->u.cli;
710         struct ptlrpc_request *req;
711         struct ost_body       *body;
712         CFS_LIST_HEAD(cancels);
713         int rc, count;
714         ENTRY;
715
716         if (!oa) {
717                 CDEBUG(D_INFO, "oa NULL\n");
718                 RETURN(-EINVAL);
719         }
720
721         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
722                                         LDLM_FL_DISCARD_DATA);
723
724         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
725         if (req == NULL) {
726                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
727                 RETURN(-ENOMEM);
728         }
729
730         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
731         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
732                                0, &cancels, count);
733         if (rc) {
734                 ptlrpc_request_free(req);
735                 RETURN(rc);
736         }
737
738         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
739         ptlrpc_at_set_req_timeout(req);
740
741         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
742                 oa->o_lcookie = *oti->oti_logcookies;
743         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
744         LASSERT(body);
745         lustre_set_wire_obdo(&body->oa, oa);
746
747         osc_pack_capa(req, body, (struct obd_capa *)capa);
748         ptlrpc_request_set_replen(req);
749
750         /* don't throttle destroy RPCs for the MDT */
751         if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
752                 req->rq_interpret_reply = osc_destroy_interpret;
753                 if (!osc_can_send_destroy(cli)) {
754                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
755                                                           NULL);
756
757                         /*
758                          * Wait until the number of on-going destroy RPCs drops
759                          * under max_rpc_in_flight
760                          */
761                         l_wait_event_exclusive(cli->cl_destroy_waitq,
762                                                osc_can_send_destroy(cli), &lwi);
763                 }
764         }
765
766         /* Do not wait for response */
767         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
768         RETURN(0);
769 }
770
771 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
772                                 long writing_bytes)
773 {
774         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
775
776         LASSERT(!(oa->o_valid & bits));
777
778         oa->o_valid |= bits;
779         client_obd_list_lock(&cli->cl_loi_list_lock);
780         oa->o_dirty = cli->cl_dirty;
781         if (cli->cl_dirty - cli->cl_dirty_transit > cli->cl_dirty_max) {
782                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
783                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
784                 oa->o_undirty = 0;
785         } else if (cfs_atomic_read(&obd_dirty_pages) -
786                    cfs_atomic_read(&obd_dirty_transit_pages) >
787                    obd_max_dirty_pages + 1){
788                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
789                  * not covered by a lock thus they may safely race and trip
790                  * this CERROR() unless we add in a small fudge factor (+1). */
791                 CERROR("dirty %d - %d > system dirty_max %d\n",
792                        cfs_atomic_read(&obd_dirty_pages),
793                        cfs_atomic_read(&obd_dirty_transit_pages),
794                        obd_max_dirty_pages);
795                 oa->o_undirty = 0;
796         } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
797                 CERROR("dirty %lu - dirty_max %lu too big???\n",
798                        cli->cl_dirty, cli->cl_dirty_max);
799                 oa->o_undirty = 0;
800         } else {
801                 long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*
802                                 (cli->cl_max_rpcs_in_flight + 1);
803                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
804         }
805         oa->o_grant = cli->cl_avail_grant;
806         oa->o_dropped = cli->cl_lost_grant;
807         cli->cl_lost_grant = 0;
808         client_obd_list_unlock(&cli->cl_loi_list_lock);
809         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
810                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
811
812 }
813
814 static void osc_update_next_shrink(struct client_obd *cli)
815 {
816         cli->cl_next_shrink_grant =
817                 cfs_time_shift(cli->cl_grant_shrink_interval);
818         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
819                cli->cl_next_shrink_grant);
820 }
821
822 /* caller must hold loi_list_lock */
823 static void osc_consume_write_grant(struct client_obd *cli,
824                                     struct brw_page *pga)
825 {
826         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
827         LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
828         cfs_atomic_inc(&obd_dirty_pages);
829         cli->cl_dirty += CFS_PAGE_SIZE;
830         cli->cl_avail_grant -= CFS_PAGE_SIZE;
831         pga->flag |= OBD_BRW_FROM_GRANT;
832         CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
833                CFS_PAGE_SIZE, pga, pga->pg);
834         LASSERT(cli->cl_avail_grant >= 0);
835         osc_update_next_shrink(cli);
836 }
837
838 /* the companion to osc_consume_write_grant, called when a brw has completed.
839  * must be called with the loi lock held. */
840 static void osc_release_write_grant(struct client_obd *cli,
841                                     struct brw_page *pga, int sent)
842 {
843         int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
844         ENTRY;
845
846         LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
847         if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
848                 EXIT;
849                 return;
850         }
851
852         pga->flag &= ~OBD_BRW_FROM_GRANT;
853         cfs_atomic_dec(&obd_dirty_pages);
854         cli->cl_dirty -= CFS_PAGE_SIZE;
855         if (pga->flag & OBD_BRW_NOCACHE) {
856                 pga->flag &= ~OBD_BRW_NOCACHE;
857                 cfs_atomic_dec(&obd_dirty_transit_pages);
858                 cli->cl_dirty_transit -= CFS_PAGE_SIZE;
859         }
860         if (!sent) {
861                 /* Reclaim grant from truncated pages. This is used to solve
862                  * write-truncate and grant all gone(to lost_grant) problem.
863                  * For a vfs write this problem can be easily solved by a sync
864                  * write, however, this is not an option for page_mkwrite()
865                  * because grant has to be allocated before a page becomes
866                  * dirty. */
867                 if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
868                         cli->cl_avail_grant += CFS_PAGE_SIZE;
869                 else
870                         cli->cl_lost_grant += CFS_PAGE_SIZE;
871                 CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
872                        cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
873         } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
874                 /* For short writes we shouldn't count parts of pages that
875                  * span a whole block on the OST side, or our accounting goes
876                  * wrong.  Should match the code in filter_grant_check. */
877                 int offset = pga->off & ~CFS_PAGE_MASK;
878                 int count = pga->count + (offset & (blocksize - 1));
879                 int end = (offset + pga->count) & (blocksize - 1);
880                 if (end)
881                         count += blocksize - end;
882
883                 cli->cl_lost_grant += CFS_PAGE_SIZE - count;
884                 CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
885                        CFS_PAGE_SIZE - count, cli->cl_lost_grant,
886                        cli->cl_avail_grant, cli->cl_dirty);
887         }
888
889         EXIT;
890 }
891
892 static unsigned long rpcs_in_flight(struct client_obd *cli)
893 {
894         return cli->cl_r_in_flight + cli->cl_w_in_flight;
895 }
896
897 /* caller must hold loi_list_lock */
898 void osc_wake_cache_waiters(struct client_obd *cli)
899 {
900         cfs_list_t *l, *tmp;
901         struct osc_cache_waiter *ocw;
902
903         ENTRY;
904         cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
905                 /* if we can't dirty more, we must wait until some is written */
906                 if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
907                    (cfs_atomic_read(&obd_dirty_pages) + 1 >
908                     obd_max_dirty_pages)) {
909                         CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
910                                "osc max %ld, sys max %d\n", cli->cl_dirty,
911                                cli->cl_dirty_max, obd_max_dirty_pages);
912                         return;
913                 }
914
915                 /* if still dirty cache but no grant wait for pending RPCs that
916                  * may yet return us some grant before doing sync writes */
917                 if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
918                         CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
919                                cli->cl_w_in_flight);
920                         return;
921                 }
922
923                 ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
924                 cfs_list_del_init(&ocw->ocw_entry);
925                 if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
926                         /* no more RPCs in flight to return grant, do sync IO */
927                         ocw->ocw_rc = -EDQUOT;
928                         CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
929                 } else {
930                         osc_consume_write_grant(cli,
931                                                 &ocw->ocw_oap->oap_brw_page);
932                 }
933
934                 CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
935                        ocw, ocw->ocw_oap, cli->cl_avail_grant);
936
937                 cfs_waitq_signal(&ocw->ocw_waitq);
938         }
939
940         EXIT;
941 }
942
943 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
944 {
945         client_obd_list_lock(&cli->cl_loi_list_lock);
946         cli->cl_avail_grant += grant;
947         client_obd_list_unlock(&cli->cl_loi_list_lock);
948 }
949
950 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
951 {
952         if (body->oa.o_valid & OBD_MD_FLGRANT) {
953                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
954                 __osc_update_grant(cli, body->oa.o_grant);
955         }
956 }
957
958 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
959                               void *key, obd_count vallen, void *val,
960                               struct ptlrpc_request_set *set);
961
962 static int osc_shrink_grant_interpret(const struct lu_env *env,
963                                       struct ptlrpc_request *req,
964                                       void *aa, int rc)
965 {
966         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
967         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
968         struct ost_body *body;
969
970         if (rc != 0) {
971                 __osc_update_grant(cli, oa->o_grant);
972                 GOTO(out, rc);
973         }
974
975         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
976         LASSERT(body);
977         osc_update_grant(cli, body);
978 out:
979         OBDO_FREE(oa);
980         return rc;
981 }
982
983 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
984 {
985         client_obd_list_lock(&cli->cl_loi_list_lock);
986         oa->o_grant = cli->cl_avail_grant / 4;
987         cli->cl_avail_grant -= oa->o_grant;
988         client_obd_list_unlock(&cli->cl_loi_list_lock);
989         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
990                 oa->o_valid |= OBD_MD_FLFLAGS;
991                 oa->o_flags = 0;
992         }
993         oa->o_flags |= OBD_FL_SHRINK_GRANT;
994         osc_update_next_shrink(cli);
995 }
996
997 /* Shrink the current grant, either from some large amount to enough for a
998  * full set of in-flight RPCs, or if we have already shrunk to that limit
999  * then to enough for a single RPC.  This avoids keeping more grant than
1000  * needed, and avoids shrinking the grant piecemeal. */
1001 static int osc_shrink_grant(struct client_obd *cli)
1002 {
1003         long target = (cli->cl_max_rpcs_in_flight + 1) *
1004                       cli->cl_max_pages_per_rpc;
1005
1006         client_obd_list_lock(&cli->cl_loi_list_lock);
1007         if (cli->cl_avail_grant <= target)
1008                 target = cli->cl_max_pages_per_rpc;
1009         client_obd_list_unlock(&cli->cl_loi_list_lock);
1010
1011         return osc_shrink_grant_to_target(cli, target);
1012 }
1013
1014 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
1015 {
1016         int    rc = 0;
1017         struct ost_body     *body;
1018         ENTRY;
1019
1020         client_obd_list_lock(&cli->cl_loi_list_lock);
1021         /* Don't shrink if we are already above or below the desired limit
1022          * We don't want to shrink below a single RPC, as that will negatively
1023          * impact block allocation and long-term performance. */
1024         if (target < cli->cl_max_pages_per_rpc)
1025                 target = cli->cl_max_pages_per_rpc;
1026
1027         if (target >= cli->cl_avail_grant) {
1028                 client_obd_list_unlock(&cli->cl_loi_list_lock);
1029                 RETURN(0);
1030         }
1031         client_obd_list_unlock(&cli->cl_loi_list_lock);
1032
1033         OBD_ALLOC_PTR(body);
1034         if (!body)
1035                 RETURN(-ENOMEM);
1036
1037         osc_announce_cached(cli, &body->oa, 0);
1038
1039         client_obd_list_lock(&cli->cl_loi_list_lock);
1040         body->oa.o_grant = cli->cl_avail_grant - target;
1041         cli->cl_avail_grant = target;
1042         client_obd_list_unlock(&cli->cl_loi_list_lock);
1043         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
1044                 body->oa.o_valid |= OBD_MD_FLFLAGS;
1045                 body->oa.o_flags = 0;
1046         }
1047         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
1048         osc_update_next_shrink(cli);
1049
1050         rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
1051                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
1052                                 sizeof(*body), body, NULL);
1053         if (rc != 0)
1054                 __osc_update_grant(cli, body->oa.o_grant);
1055         OBD_FREE_PTR(body);
1056         RETURN(rc);
1057 }
1058
1059 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
1060 static int osc_should_shrink_grant(struct client_obd *client)
1061 {
1062         cfs_time_t time = cfs_time_current();
1063         cfs_time_t next_shrink = client->cl_next_shrink_grant;
1064
1065         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
1066              OBD_CONNECT_GRANT_SHRINK) == 0)
1067                 return 0;
1068
1069         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1070                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1071                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1072                         return 1;
1073                 else
1074                         osc_update_next_shrink(client);
1075         }
1076         return 0;
1077 }
1078
1079 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1080 {
1081         struct client_obd *client;
1082
1083         cfs_list_for_each_entry(client, &item->ti_obd_list,
1084                                 cl_grant_shrink_list) {
1085                 if (osc_should_shrink_grant(client))
1086                         osc_shrink_grant(client);
1087         }
1088         return 0;
1089 }
1090
1091 static int osc_add_shrink_grant(struct client_obd *client)
1092 {
1093         int rc;
1094
1095         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1096                                        TIMEOUT_GRANT,
1097                                        osc_grant_shrink_grant_cb, NULL,
1098                                        &client->cl_grant_shrink_list);
1099         if (rc) {
1100                 CERROR("add grant client %s error %d\n",
1101                         client->cl_import->imp_obd->obd_name, rc);
1102                 return rc;
1103         }
1104         CDEBUG(D_CACHE, "add grant client %s \n",
1105                client->cl_import->imp_obd->obd_name);
1106         osc_update_next_shrink(client);
1107         return 0;
1108 }
1109
1110 static int osc_del_shrink_grant(struct client_obd *client)
1111 {
1112         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1113                                          TIMEOUT_GRANT);
1114 }
1115
1116 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1117 {
1118         /*
1119          * ocd_grant is the total grant amount we're expect to hold: if we've
1120          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1121          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1122          *
1123          * race is tolerable here: if we're evicted, but imp_state already
1124          * left EVICTED state, then cl_dirty must be 0 already.
1125          */
1126         client_obd_list_lock(&cli->cl_loi_list_lock);
1127         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1128                 cli->cl_avail_grant = ocd->ocd_grant;
1129         else
1130                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1131
1132         if (cli->cl_avail_grant < 0) {
1133                 CWARN("%s: available grant < 0, the OSS is probably not running"
1134                       " with patch from bug20278 (%ld) \n",
1135                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1136                 /* workaround for 1.6 servers which do not have
1137                  * the patch from bug20278 */
1138                 cli->cl_avail_grant = ocd->ocd_grant;
1139         }
1140
1141         client_obd_list_unlock(&cli->cl_loi_list_lock);
1142
1143         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
1144                cli->cl_import->imp_obd->obd_name,
1145                cli->cl_avail_grant, cli->cl_lost_grant);
1146
1147         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1148             cfs_list_empty(&cli->cl_grant_shrink_list))
1149                 osc_add_shrink_grant(cli);
1150 }
1151
1152 /* We assume that the reason this OSC got a short read is because it read
1153  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1154  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1155  * this stripe never got written at or beyond this stripe offset yet. */
1156 static void handle_short_read(int nob_read, obd_count page_count,
1157                               struct brw_page **pga)
1158 {
1159         char *ptr;
1160         int i = 0;
1161
1162         /* skip bytes read OK */
1163         while (nob_read > 0) {
1164                 LASSERT (page_count > 0);
1165
1166                 if (pga[i]->count > nob_read) {
1167                         /* EOF inside this page */
1168                         ptr = cfs_kmap(pga[i]->pg) +
1169                                 (pga[i]->off & ~CFS_PAGE_MASK);
1170                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1171                         cfs_kunmap(pga[i]->pg);
1172                         page_count--;
1173                         i++;
1174                         break;
1175                 }
1176
1177                 nob_read -= pga[i]->count;
1178                 page_count--;
1179                 i++;
1180         }
1181
1182         /* zero remaining pages */
1183         while (page_count-- > 0) {
1184                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1185                 memset(ptr, 0, pga[i]->count);
1186                 cfs_kunmap(pga[i]->pg);
1187                 i++;
1188         }
1189 }
1190
1191 static int check_write_rcs(struct ptlrpc_request *req,
1192                            int requested_nob, int niocount,
1193                            obd_count page_count, struct brw_page **pga)
1194 {
1195         int     i;
1196         __u32   *remote_rcs;
1197
1198         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1199                                                   sizeof(*remote_rcs) *
1200                                                   niocount);
1201         if (remote_rcs == NULL) {
1202                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1203                 return(-EPROTO);
1204         }
1205
1206         /* return error if any niobuf was in error */
1207         for (i = 0; i < niocount; i++) {
1208                 if ((int)remote_rcs[i] < 0)
1209                         return(remote_rcs[i]);
1210
1211                 if (remote_rcs[i] != 0) {
1212                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1213                                 i, remote_rcs[i], req);
1214                         return(-EPROTO);
1215                 }
1216         }
1217
1218         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1219                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1220                        req->rq_bulk->bd_nob_transferred, requested_nob);
1221                 return(-EPROTO);
1222         }
1223
1224         return (0);
1225 }
1226
1227 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1228 {
1229         if (p1->flag != p2->flag) {
1230                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1231                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1232
1233                 /* warn if we try to combine flags that we don't know to be
1234                  * safe to combine */
1235                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1236                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1237                               "report this at http://bugs.whamcloud.com/\n",
1238                               p1->flag, p2->flag);
1239                 }
1240                 return 0;
1241         }
1242
1243         return (p1->off + p1->count == p2->off);
1244 }
1245
1246 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1247                                    struct brw_page **pga, int opc,
1248                                    cksum_type_t cksum_type)
1249 {
1250         __u32 cksum;
1251         int i = 0;
1252
1253         LASSERT (pg_count > 0);
1254         cksum = init_checksum(cksum_type);
1255         while (nob > 0 && pg_count > 0) {
1256                 unsigned char *ptr = cfs_kmap(pga[i]->pg);
1257                 int off = pga[i]->off & ~CFS_PAGE_MASK;
1258                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1259
1260                 /* corrupt the data before we compute the checksum, to
1261                  * simulate an OST->client data error */
1262                 if (i == 0 && opc == OST_READ &&
1263                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
1264                         memcpy(ptr + off, "bad1", min(4, nob));
1265                 cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
1266                 cfs_kunmap(pga[i]->pg);
1267                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1268                                off, cksum);
1269
1270                 nob -= pga[i]->count;
1271                 pg_count--;
1272                 i++;
1273         }
1274         /* For sending we only compute the wrong checksum instead
1275          * of corrupting the data so it is still correct on a redo */
1276         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1277                 cksum++;
1278
1279         return fini_checksum(cksum, cksum_type);
1280 }
1281
1282 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1283                                 struct lov_stripe_md *lsm, obd_count page_count,
1284                                 struct brw_page **pga,
1285                                 struct ptlrpc_request **reqp,
1286                                 struct obd_capa *ocapa, int reserve,
1287                                 int resend)
1288 {
1289         struct ptlrpc_request   *req;
1290         struct ptlrpc_bulk_desc *desc;
1291         struct ost_body         *body;
1292         struct obd_ioobj        *ioobj;
1293         struct niobuf_remote    *niobuf;
1294         int niocount, i, requested_nob, opc, rc;
1295         struct osc_brw_async_args *aa;
1296         struct req_capsule      *pill;
1297         struct brw_page *pg_prev;
1298
1299         ENTRY;
1300         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1301                 RETURN(-ENOMEM); /* Recoverable */
1302         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1303                 RETURN(-EINVAL); /* Fatal */
1304
1305         if ((cmd & OBD_BRW_WRITE) != 0) {
1306                 opc = OST_WRITE;
1307                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1308                                                 cli->cl_import->imp_rq_pool,
1309                                                 &RQF_OST_BRW_WRITE);
1310         } else {
1311                 opc = OST_READ;
1312                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1313         }
1314         if (req == NULL)
1315                 RETURN(-ENOMEM);
1316
1317         for (niocount = i = 1; i < page_count; i++) {
1318                 if (!can_merge_pages(pga[i - 1], pga[i]))
1319                         niocount++;
1320         }
1321
1322         pill = &req->rq_pill;
1323         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1324                              sizeof(*ioobj));
1325         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1326                              niocount * sizeof(*niobuf));
1327         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1328
1329         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1330         if (rc) {
1331                 ptlrpc_request_free(req);
1332                 RETURN(rc);
1333         }
1334         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1335         ptlrpc_at_set_req_timeout(req);
1336
1337         if (opc == OST_WRITE)
1338                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1339                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1340         else
1341                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1342                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1343
1344         if (desc == NULL)
1345                 GOTO(out, rc = -ENOMEM);
1346         /* NB request now owns desc and will free it when it gets freed */
1347
1348         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1349         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1350         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1351         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1352
1353         lustre_set_wire_obdo(&body->oa, oa);
1354
1355         obdo_to_ioobj(oa, ioobj);
1356         ioobj->ioo_bufcnt = niocount;
1357         osc_pack_capa(req, body, ocapa);
1358         LASSERT (page_count > 0);
1359         pg_prev = pga[0];
1360         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1361                 struct brw_page *pg = pga[i];
1362                 int poff = pg->off & ~CFS_PAGE_MASK;
1363
1364                 LASSERT(pg->count > 0);
1365                 /* make sure there is no gap in the middle of page array */
1366                 LASSERTF(page_count == 1 ||
1367                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1368                           ergo(i > 0 && i < page_count - 1,
1369                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1370                           ergo(i == page_count - 1, poff == 0)),
1371                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1372                          i, page_count, pg, pg->off, pg->count);
1373 #ifdef __linux__
1374                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1375                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1376                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1377                          i, page_count,
1378                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1379                          pg_prev->pg, page_private(pg_prev->pg),
1380                          pg_prev->pg->index, pg_prev->off);
1381 #else
1382                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1383                          "i %d p_c %u\n", i, page_count);
1384 #endif
1385                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1386                         (pg->flag & OBD_BRW_SRVLOCK));
1387
1388                 ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
1389                 requested_nob += pg->count;
1390
1391                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1392                         niobuf--;
1393                         niobuf->len += pg->count;
1394                 } else {
1395                         niobuf->offset = pg->off;
1396                         niobuf->len    = pg->count;
1397                         niobuf->flags  = pg->flag;
1398                 }
1399                 pg_prev = pg;
1400         }
1401
1402         LASSERTF((void *)(niobuf - niocount) ==
1403                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1404                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1405                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1406
1407         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1408         if (resend) {
1409                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1410                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1411                         body->oa.o_flags = 0;
1412                 }
1413                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1414         }
1415
1416         if (osc_should_shrink_grant(cli))
1417                 osc_shrink_grant_local(cli, &body->oa);
1418
1419         /* size[REQ_REC_OFF] still sizeof (*body) */
1420         if (opc == OST_WRITE) {
1421                 if (cli->cl_checksum &&
1422                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1423                         /* store cl_cksum_type in a local variable since
1424                          * it can be changed via lprocfs */
1425                         cksum_type_t cksum_type = cli->cl_cksum_type;
1426
1427                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1428                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1429                                 body->oa.o_flags = 0;
1430                         }
1431                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1432                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1433                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1434                                                              page_count, pga,
1435                                                              OST_WRITE,
1436                                                              cksum_type);
1437                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1438                                body->oa.o_cksum);
1439                         /* save this in 'oa', too, for later checking */
1440                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1441                         oa->o_flags |= cksum_type_pack(cksum_type);
1442                 } else {
1443                         /* clear out the checksum flag, in case this is a
1444                          * resend but cl_checksum is no longer set. b=11238 */
1445                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1446                 }
1447                 oa->o_cksum = body->oa.o_cksum;
1448                 /* 1 RC per niobuf */
1449                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1450                                      sizeof(__u32) * niocount);
1451         } else {
1452                 if (cli->cl_checksum &&
1453                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1454                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1455                                 body->oa.o_flags = 0;
1456                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1457                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1458                 }
1459         }
1460         ptlrpc_request_set_replen(req);
1461
1462         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1463         aa = ptlrpc_req_async_args(req);
1464         aa->aa_oa = oa;
1465         aa->aa_requested_nob = requested_nob;
1466         aa->aa_nio_count = niocount;
1467         aa->aa_page_count = page_count;
1468         aa->aa_resends = 0;
1469         aa->aa_ppga = pga;
1470         aa->aa_cli = cli;
1471         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1472         if (ocapa && reserve)
1473                 aa->aa_ocapa = capa_get(ocapa);
1474
1475         *reqp = req;
1476         RETURN(0);
1477
1478  out:
1479         ptlrpc_req_finished(req);
1480         RETURN(rc);
1481 }
1482
1483 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1484                                 __u32 client_cksum, __u32 server_cksum, int nob,
1485                                 obd_count page_count, struct brw_page **pga,
1486                                 cksum_type_t client_cksum_type)
1487 {
1488         __u32 new_cksum;
1489         char *msg;
1490         cksum_type_t cksum_type;
1491
1492         if (server_cksum == client_cksum) {
1493                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1494                 return 0;
1495         }
1496
1497         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1498                                        oa->o_flags : 0);
1499         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1500                                       cksum_type);
1501
1502         if (cksum_type != client_cksum_type)
1503                 msg = "the server did not use the checksum type specified in "
1504                       "the original request - likely a protocol problem";
1505         else if (new_cksum == server_cksum)
1506                 msg = "changed on the client after we checksummed it - "
1507                       "likely false positive due to mmap IO (bug 11742)";
1508         else if (new_cksum == client_cksum)
1509                 msg = "changed in transit before arrival at OST";
1510         else
1511                 msg = "changed in transit AND doesn't match the original - "
1512                       "likely false positive due to mmap IO (bug 11742)";
1513
1514         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1515                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1516                            msg, libcfs_nid2str(peer->nid),
1517                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1518                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1519                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1520                            oa->o_id,
1521                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1522                            pga[0]->off,
1523                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1524         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1525                "client csum now %x\n", client_cksum, client_cksum_type,
1526                server_cksum, cksum_type, new_cksum);
1527         return 1;
1528 }
1529
1530 /* Note rc enters this function as number of bytes transferred */
1531 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1532 {
1533         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1534         const lnet_process_id_t *peer =
1535                         &req->rq_import->imp_connection->c_peer;
1536         struct client_obd *cli = aa->aa_cli;
1537         struct ost_body *body;
1538         __u32 client_cksum = 0;
1539         ENTRY;
1540
1541         if (rc < 0 && rc != -EDQUOT) {
1542                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1543                 RETURN(rc);
1544         }
1545
1546         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1547         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1548         if (body == NULL) {
1549                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1550                 RETURN(-EPROTO);
1551         }
1552
1553         /* set/clear over quota flag for a uid/gid */
1554         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1555             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1556                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1557
1558                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1559                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1560                        body->oa.o_flags);
1561                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1562         }
1563
1564         osc_update_grant(cli, body);
1565
1566         if (rc < 0)
1567                 RETURN(rc);
1568
1569         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1570                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1571
1572         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1573                 if (rc > 0) {
1574                         CERROR("Unexpected +ve rc %d\n", rc);
1575                         RETURN(-EPROTO);
1576                 }
1577                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1578
1579                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1580                         RETURN(-EAGAIN);
1581
1582                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1583                     check_write_checksum(&body->oa, peer, client_cksum,
1584                                          body->oa.o_cksum, aa->aa_requested_nob,
1585                                          aa->aa_page_count, aa->aa_ppga,
1586                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1587                         RETURN(-EAGAIN);
1588
1589                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1590                                      aa->aa_page_count, aa->aa_ppga);
1591                 GOTO(out, rc);
1592         }
1593
1594         /* The rest of this function executes only for OST_READs */
1595
1596         /* if unwrap_bulk failed, return -EAGAIN to retry */
1597         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1598         if (rc < 0)
1599                 GOTO(out, rc = -EAGAIN);
1600
1601         if (rc > aa->aa_requested_nob) {
1602                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1603                        aa->aa_requested_nob);
1604                 RETURN(-EPROTO);
1605         }
1606
1607         if (rc != req->rq_bulk->bd_nob_transferred) {
1608                 CERROR ("Unexpected rc %d (%d transferred)\n",
1609                         rc, req->rq_bulk->bd_nob_transferred);
1610                 return (-EPROTO);
1611         }
1612
1613         if (rc < aa->aa_requested_nob)
1614                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1615
1616         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1617                 static int cksum_counter;
1618                 __u32      server_cksum = body->oa.o_cksum;
1619                 char      *via;
1620                 char      *router;
1621                 cksum_type_t cksum_type;
1622
1623                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1624                                                body->oa.o_flags : 0);
1625                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1626                                                  aa->aa_ppga, OST_READ,
1627                                                  cksum_type);
1628
1629                 if (peer->nid == req->rq_bulk->bd_sender) {
1630                         via = router = "";
1631                 } else {
1632                         via = " via ";
1633                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1634                 }
1635
1636                 if (server_cksum == ~0 && rc > 0) {
1637                         CERROR("Protocol error: server %s set the 'checksum' "
1638                                "bit, but didn't send a checksum.  Not fatal, "
1639                                "but please notify on http://bugs.whamcloud.com/\n",
1640                                libcfs_nid2str(peer->nid));
1641                 } else if (server_cksum != client_cksum) {
1642                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1643                                            "%s%s%s inode "DFID" object "
1644                                            LPU64"/"LPU64" extent "
1645                                            "["LPU64"-"LPU64"]\n",
1646                                            req->rq_import->imp_obd->obd_name,
1647                                            libcfs_nid2str(peer->nid),
1648                                            via, router,
1649                                            body->oa.o_valid & OBD_MD_FLFID ?
1650                                                 body->oa.o_parent_seq : (__u64)0,
1651                                            body->oa.o_valid & OBD_MD_FLFID ?
1652                                                 body->oa.o_parent_oid : 0,
1653                                            body->oa.o_valid & OBD_MD_FLFID ?
1654                                                 body->oa.o_parent_ver : 0,
1655                                            body->oa.o_id,
1656                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1657                                                 body->oa.o_seq : (__u64)0,
1658                                            aa->aa_ppga[0]->off,
1659                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1660                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1661                                                                         1);
1662                         CERROR("client %x, server %x, cksum_type %x\n",
1663                                client_cksum, server_cksum, cksum_type);
1664                         cksum_counter = 0;
1665                         aa->aa_oa->o_cksum = client_cksum;
1666                         rc = -EAGAIN;
1667                 } else {
1668                         cksum_counter++;
1669                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1670                         rc = 0;
1671                 }
1672         } else if (unlikely(client_cksum)) {
1673                 static int cksum_missed;
1674
1675                 cksum_missed++;
1676                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1677                         CERROR("Checksum %u requested from %s but not sent\n",
1678                                cksum_missed, libcfs_nid2str(peer->nid));
1679         } else {
1680                 rc = 0;
1681         }
1682 out:
1683         if (rc >= 0)
1684                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1685
1686         RETURN(rc);
1687 }
1688
1689 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1690                             struct lov_stripe_md *lsm,
1691                             obd_count page_count, struct brw_page **pga,
1692                             struct obd_capa *ocapa)
1693 {
1694         struct ptlrpc_request *req;
1695         int                    rc;
1696         cfs_waitq_t            waitq;
1697         int                    generation, resends = 0;
1698         struct l_wait_info     lwi;
1699
1700         ENTRY;
1701
1702         cfs_waitq_init(&waitq);
1703         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1704
1705 restart_bulk:
1706         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1707                                   page_count, pga, &req, ocapa, 0, resends);
1708         if (rc != 0)
1709                 return (rc);
1710
1711         if (resends) {
1712                 req->rq_generation_set = 1;
1713                 req->rq_import_generation = generation;
1714                 req->rq_sent = cfs_time_current_sec() + resends;
1715         }
1716
1717         rc = ptlrpc_queue_wait(req);
1718
1719         if (rc == -ETIMEDOUT && req->rq_resend) {
1720                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1721                 ptlrpc_req_finished(req);
1722                 goto restart_bulk;
1723         }
1724
1725         rc = osc_brw_fini_request(req, rc);
1726
1727         ptlrpc_req_finished(req);
1728         /* When server return -EINPROGRESS, client should always retry
1729          * regardless of the number of times the bulk was resent already.*/
1730         if (osc_recoverable_error(rc)) {
1731                 resends++;
1732                 if (rc != -EINPROGRESS &&
1733                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1734                         CERROR("%s: too many resend retries for object: "
1735                                ""LPU64":"LPU64", rc = %d.\n",
1736                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1737                         goto out;
1738                 }
1739                 if (generation !=
1740                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1741                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1742                                ""LPU64":"LPU64", rc = %d.\n",
1743                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1744                         goto out;
1745                 }
1746
1747                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1748                                        NULL);
1749                 l_wait_event(waitq, 0, &lwi);
1750
1751                 goto restart_bulk;
1752         }
1753 out:
1754         if (rc == -EAGAIN || rc == -EINPROGRESS)
1755                 rc = -EIO;
1756         RETURN (rc);
1757 }
1758
1759 int osc_brw_redo_request(struct ptlrpc_request *request,
1760                          struct osc_brw_async_args *aa)
1761 {
1762         struct ptlrpc_request *new_req;
1763         struct ptlrpc_request_set *set = request->rq_set;
1764         struct osc_brw_async_args *new_aa;
1765         struct osc_async_page *oap;
1766         int rc = 0;
1767         ENTRY;
1768
1769         DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
1770
1771         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1772                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1773                                   aa->aa_cli, aa->aa_oa,
1774                                   NULL /* lsm unused by osc currently */,
1775                                   aa->aa_page_count, aa->aa_ppga,
1776                                   &new_req, aa->aa_ocapa, 0, 1);
1777         if (rc)
1778                 RETURN(rc);
1779
1780         client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
1781
1782         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1783                 if (oap->oap_request != NULL) {
1784                         LASSERTF(request == oap->oap_request,
1785                                  "request %p != oap_request %p\n",
1786                                  request, oap->oap_request);
1787                         if (oap->oap_interrupted) {
1788                                 client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1789                                 ptlrpc_req_finished(new_req);
1790                                 RETURN(-EINTR);
1791                         }
1792                 }
1793         }
1794         /* New request takes over pga and oaps from old request.
1795          * Note that copying a list_head doesn't work, need to move it... */
1796         aa->aa_resends++;
1797         new_req->rq_interpret_reply = request->rq_interpret_reply;
1798         new_req->rq_async_args = request->rq_async_args;
1799         new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1800         new_req->rq_generation_set = 1;
1801         new_req->rq_import_generation = request->rq_import_generation;
1802
1803         new_aa = ptlrpc_req_async_args(new_req);
1804
1805         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1806         cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
1807         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1808
1809         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1810                 if (oap->oap_request) {
1811                         ptlrpc_req_finished(oap->oap_request);
1812                         oap->oap_request = ptlrpc_request_addref(new_req);
1813                 }
1814         }
1815
1816         new_aa->aa_ocapa = aa->aa_ocapa;
1817         aa->aa_ocapa = NULL;
1818
1819         /* use ptlrpc_set_add_req is safe because interpret functions work
1820          * in check_set context. only one way exist with access to request
1821          * from different thread got -EINTR - this way protected with
1822          * cl_loi_list_lock */
1823         ptlrpc_set_add_req(set, new_req);
1824
1825         client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
1826
1827         DEBUG_REQ(D_INFO, new_req, "new request");
1828         RETURN(0);
1829 }
1830
1831 /*
1832  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1833  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1834  * fine for our small page arrays and doesn't require allocation.  its an
1835  * insertion sort that swaps elements that are strides apart, shrinking the
1836  * stride down until its '1' and the array is sorted.
1837  */
1838 static void sort_brw_pages(struct brw_page **array, int num)
1839 {
1840         int stride, i, j;
1841         struct brw_page *tmp;
1842
1843         if (num == 1)
1844                 return;
1845         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1846                 ;
1847
1848         do {
1849                 stride /= 3;
1850                 for (i = stride ; i < num ; i++) {
1851                         tmp = array[i];
1852                         j = i;
1853                         while (j >= stride && array[j - stride]->off > tmp->off) {
1854                                 array[j] = array[j - stride];
1855                                 j -= stride;
1856                         }
1857                         array[j] = tmp;
1858                 }
1859         } while (stride > 1);
1860 }
1861
1862 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1863 {
1864         int count = 1;
1865         int offset;
1866         int i = 0;
1867
1868         LASSERT (pages > 0);
1869         offset = pg[i]->off & ~CFS_PAGE_MASK;
1870
1871         for (;;) {
1872                 pages--;
1873                 if (pages == 0)         /* that's all */
1874                         return count;
1875
1876                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1877                         return count;   /* doesn't end on page boundary */
1878
1879                 i++;
1880                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1881                 if (offset != 0)        /* doesn't start on page boundary */
1882                         return count;
1883
1884                 count++;
1885         }
1886 }
1887
1888 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1889 {
1890         struct brw_page **ppga;
1891         int i;
1892
1893         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1894         if (ppga == NULL)
1895                 return NULL;
1896
1897         for (i = 0; i < count; i++)
1898                 ppga[i] = pga + i;
1899         return ppga;
1900 }
1901
1902 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1903 {
1904         LASSERT(ppga != NULL);
1905         OBD_FREE(ppga, sizeof(*ppga) * count);
1906 }
1907
1908 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1909                    obd_count page_count, struct brw_page *pga,
1910                    struct obd_trans_info *oti)
1911 {
1912         struct obdo *saved_oa = NULL;
1913         struct brw_page **ppga, **orig;
1914         struct obd_import *imp = class_exp2cliimp(exp);
1915         struct client_obd *cli;
1916         int rc, page_count_orig;
1917         ENTRY;
1918
1919         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1920         cli = &imp->imp_obd->u.cli;
1921
1922         if (cmd & OBD_BRW_CHECK) {
1923                 /* The caller just wants to know if there's a chance that this
1924                  * I/O can succeed */
1925
1926                 if (imp->imp_invalid)
1927                         RETURN(-EIO);
1928                 RETURN(0);
1929         }
1930
1931         /* test_brw with a failed create can trip this, maybe others. */
1932         LASSERT(cli->cl_max_pages_per_rpc);
1933
1934         rc = 0;
1935
1936         orig = ppga = osc_build_ppga(pga, page_count);
1937         if (ppga == NULL)
1938                 RETURN(-ENOMEM);
1939         page_count_orig = page_count;
1940
1941         sort_brw_pages(ppga, page_count);
1942         while (page_count) {
1943                 obd_count pages_per_brw;
1944
1945                 if (page_count > cli->cl_max_pages_per_rpc)
1946                         pages_per_brw = cli->cl_max_pages_per_rpc;
1947                 else
1948                         pages_per_brw = page_count;
1949
1950                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1951
1952                 if (saved_oa != NULL) {
1953                         /* restore previously saved oa */
1954                         *oinfo->oi_oa = *saved_oa;
1955                 } else if (page_count > pages_per_brw) {
1956                         /* save a copy of oa (brw will clobber it) */
1957                         OBDO_ALLOC(saved_oa);
1958                         if (saved_oa == NULL)
1959                                 GOTO(out, rc = -ENOMEM);
1960                         *saved_oa = *oinfo->oi_oa;
1961                 }
1962
1963                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1964                                       pages_per_brw, ppga, oinfo->oi_capa);
1965
1966                 if (rc != 0)
1967                         break;
1968
1969                 page_count -= pages_per_brw;
1970                 ppga += pages_per_brw;
1971         }
1972
1973 out:
1974         osc_release_ppga(orig, page_count_orig);
1975
1976         if (saved_oa != NULL)
1977                 OBDO_FREE(saved_oa);
1978
1979         RETURN(rc);
1980 }
1981
1982 /* The companion to osc_enter_cache(), called when @oap is no longer part of
1983  * the dirty accounting.  Writeback completes or truncate happens before
1984  * writing starts.  Must be called with the loi lock held. */
1985 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
1986                            int sent)
1987 {
1988         osc_release_write_grant(cli, &oap->oap_brw_page, sent);
1989 }
1990
1991
1992 /* This maintains the lists of pending pages to read/write for a given object
1993  * (lop).  This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
1994  * to quickly find objects that are ready to send an RPC. */
1995 static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
1996                          int cmd)
1997 {
1998         ENTRY;
1999
2000         if (lop->lop_num_pending == 0)
2001                 RETURN(0);
2002
2003         /* if we have an invalid import we want to drain the queued pages
2004          * by forcing them through rpcs that immediately fail and complete
2005          * the pages.  recovery relies on this to empty the queued pages
2006          * before canceling the locks and evicting down the llite pages */
2007         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
2008                 RETURN(1);
2009
2010         /* stream rpcs in queue order as long as as there is an urgent page
2011          * queued.  this is our cheap solution for good batching in the case
2012          * where writepage marks some random page in the middle of the file
2013          * as urgent because of, say, memory pressure */
2014         if (!cfs_list_empty(&lop->lop_urgent)) {
2015                 CDEBUG(D_CACHE, "urgent request forcing RPC\n");
2016                 RETURN(1);
2017         }
2018
2019         if (cmd & OBD_BRW_WRITE) {
2020                 /* trigger a write rpc stream as long as there are dirtiers
2021                  * waiting for space.  as they're waiting, they're not going to
2022                  * create more pages to coalesce with what's waiting.. */
2023                 if (!cfs_list_empty(&cli->cl_cache_waiters)) {
2024                         CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
2025                         RETURN(1);
2026                 }
2027         }
2028         if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
2029                 RETURN(1);
2030
2031         RETURN(0);
2032 }
2033
2034 static int lop_makes_hprpc(struct loi_oap_pages *lop)
2035 {
2036         struct osc_async_page *oap;
2037         ENTRY;
2038
2039         if (cfs_list_empty(&lop->lop_urgent))
2040                 RETURN(0);
2041
2042         oap = cfs_list_entry(lop->lop_urgent.next,
2043                          struct osc_async_page, oap_urgent_item);
2044
2045         if (oap->oap_async_flags & ASYNC_HP) {
2046                 CDEBUG(D_CACHE, "hp request forcing RPC\n");
2047                 RETURN(1);
2048         }
2049
2050         RETURN(0);
2051 }
2052
2053 static void on_list(cfs_list_t *item, cfs_list_t *list,
2054                     int should_be_on)
2055 {
2056         if (cfs_list_empty(item) && should_be_on)
2057                 cfs_list_add_tail(item, list);
2058         else if (!cfs_list_empty(item) && !should_be_on)
2059                 cfs_list_del_init(item);
2060 }
2061
2062 /* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
2063  * can find pages to build into rpcs quickly */
2064 void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
2065 {
2066         if (lop_makes_hprpc(&loi->loi_write_lop) ||
2067             lop_makes_hprpc(&loi->loi_read_lop)) {
2068                 /* HP rpc */
2069                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
2070                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
2071         } else {
2072                 on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
2073                 on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
2074                         lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
2075                         lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
2076         }
2077
2078         on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
2079                 loi->loi_write_lop.lop_num_pending);
2080
2081         on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
2082                 loi->loi_read_lop.lop_num_pending);
2083 }
2084
2085 static void lop_update_pending(struct client_obd *cli,
2086                                struct loi_oap_pages *lop, int cmd, int delta)
2087 {
2088         lop->lop_num_pending += delta;
2089         if (cmd & OBD_BRW_WRITE)
2090                 cli->cl_pending_w_pages += delta;
2091         else
2092                 cli->cl_pending_r_pages += delta;
2093 }
2094
2095 /**
2096  * this is called when a sync waiter receives an interruption.  Its job is to
2097  * get the caller woken as soon as possible.  If its page hasn't been put in an
2098  * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
2099  * desiring interruption which will forcefully complete the rpc once the rpc
2100  * has timed out.
2101  */
2102 int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
2103 {
2104         struct loi_oap_pages *lop;
2105         struct lov_oinfo *loi;
2106         int rc = -EBUSY;
2107         ENTRY;
2108
2109         LASSERT(!oap->oap_interrupted);
2110         oap->oap_interrupted = 1;
2111
2112         /* ok, it's been put in an rpc. only one oap gets a request reference */
2113         if (oap->oap_request != NULL) {
2114                 ptlrpc_mark_interrupted(oap->oap_request);
2115                 ptlrpcd_wake(oap->oap_request);
2116                 ptlrpc_req_finished(oap->oap_request);
2117                 oap->oap_request = NULL;
2118         }
2119
2120         /*
2121          * page completion may be called only if ->cpo_prep() method was
2122          * executed by osc_io_submit(), that also adds page the to pending list
2123          */
2124         if (!cfs_list_empty(&oap->oap_pending_item)) {
2125                 cfs_list_del_init(&oap->oap_pending_item);
2126                 cfs_list_del_init(&oap->oap_urgent_item);
2127
2128                 loi = oap->oap_loi;
2129                 lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
2130                         &loi->loi_write_lop : &loi->loi_read_lop;
2131                 lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
2132                 loi_list_maint(oap->oap_cli, oap->oap_loi);
2133                 rc = oap->oap_caller_ops->ap_completion(env,
2134                                           oap->oap_caller_data,
2135                                           oap->oap_cmd, NULL, -EINTR);
2136         }
2137
2138         RETURN(rc);
2139 }
2140
2141 /* this is trying to propogate async writeback errors back up to the
2142  * application.  As an async write fails we record the error code for later if
2143  * the app does an fsync.  As long as errors persist we force future rpcs to be
2144  * sync so that the app can get a sync error and break the cycle of queueing
2145  * pages for which writeback will fail. */
2146 static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
2147                            int rc)
2148 {
2149         if (rc) {
2150                 if (!ar->ar_rc)
2151                         ar->ar_rc = rc;
2152
2153                 ar->ar_force_sync = 1;
2154                 ar->ar_min_xid = ptlrpc_sample_next_xid();
2155                 return;
2156
2157         }
2158
2159         if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
2160                 ar->ar_force_sync = 0;
2161 }
2162
2163 void osc_oap_to_pending(struct osc_async_page *oap)
2164 {
2165         struct loi_oap_pages *lop;
2166
2167         if (oap->oap_cmd & OBD_BRW_WRITE)
2168                 lop = &oap->oap_loi->loi_write_lop;
2169         else
2170                 lop = &oap->oap_loi->loi_read_lop;
2171
2172         if (oap->oap_async_flags & ASYNC_HP)
2173                 cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
2174         else if (oap->oap_async_flags & ASYNC_URGENT)
2175                 cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
2176         cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
2177         lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
2178 }
2179
2180 /* this must be called holding the loi list lock to give coverage to exit_cache,
2181  * async_flag maintenance, and oap_request */
2182 static void osc_ap_completion(const struct lu_env *env,
2183                               struct client_obd *cli, struct obdo *oa,
2184                               struct osc_async_page *oap, int sent, int rc)
2185 {
2186         __u64 xid = 0;
2187
2188         ENTRY;
2189         if (oap->oap_request != NULL) {
2190                 xid = ptlrpc_req_xid(oap->oap_request);
2191                 ptlrpc_req_finished(oap->oap_request);
2192                 oap->oap_request = NULL;
2193         }
2194
2195         cfs_spin_lock(&oap->oap_lock);
2196         oap->oap_async_flags = 0;
2197         cfs_spin_unlock(&oap->oap_lock);
2198         oap->oap_interrupted = 0;
2199
2200         if (oap->oap_cmd & OBD_BRW_WRITE) {
2201                 osc_process_ar(&cli->cl_ar, xid, rc);
2202                 osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
2203         }
2204
2205         if (rc == 0 && oa != NULL) {
2206                 if (oa->o_valid & OBD_MD_FLBLOCKS)
2207                         oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
2208                 if (oa->o_valid & OBD_MD_FLMTIME)
2209                         oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
2210                 if (oa->o_valid & OBD_MD_FLATIME)
2211                         oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
2212                 if (oa->o_valid & OBD_MD_FLCTIME)
2213                         oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
2214         }
2215
2216         rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
2217                                                 oap->oap_cmd, oa, rc);
2218
2219         /* cl_page_completion() drops PG_locked. so, a new I/O on the page could
2220          * start, but OSC calls it under lock and thus we can add oap back to
2221          * pending safely */
2222         if (rc)
2223                 /* upper layer wants to leave the page on pending queue */
2224                 osc_oap_to_pending(oap);
2225         else
2226                 osc_exit_cache(cli, oap, sent);
2227         EXIT;
2228 }
2229
2230 static int brw_queue_work(const struct lu_env *env, void *data)
2231 {
2232         struct client_obd *cli = data;
2233
2234         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
2235
2236         client_obd_list_lock(&cli->cl_loi_list_lock);
2237         osc_check_rpcs0(env, cli, 1);
2238         client_obd_list_unlock(&cli->cl_loi_list_lock);
2239         RETURN(0);
2240 }
2241
2242 static int brw_interpret(const struct lu_env *env,
2243                          struct ptlrpc_request *req, void *data, int rc)
2244 {
2245         struct osc_brw_async_args *aa = data;
2246         struct client_obd *cli;
2247         int async;
2248         ENTRY;
2249
2250         rc = osc_brw_fini_request(req, rc);
2251         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
2252         /* When server return -EINPROGRESS, client should always retry
2253          * regardless of the number of times the bulk was resent already. */
2254         if (osc_recoverable_error(rc)) {
2255                 if (req->rq_import_generation !=
2256                     req->rq_import->imp_generation) {
2257                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
2258                                ""LPU64":"LPU64", rc = %d.\n",
2259                                req->rq_import->imp_obd->obd_name,
2260                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2261                 } else if (rc == -EINPROGRESS ||
2262                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
2263                         rc = osc_brw_redo_request(req, aa);
2264                 } else {
2265                         CERROR("%s: too many resent retries for object: "
2266                                ""LPU64":"LPU64", rc = %d.\n",
2267                                req->rq_import->imp_obd->obd_name,
2268                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
2269                 }
2270
2271                 if (rc == 0)
2272                         RETURN(0);
2273                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
2274                         rc = -EIO;
2275         }
2276
2277         if (aa->aa_ocapa) {
2278                 capa_put(aa->aa_ocapa);
2279                 aa->aa_ocapa = NULL;
2280         }
2281
2282         cli = aa->aa_cli;
2283         client_obd_list_lock(&cli->cl_loi_list_lock);
2284
2285         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2286          * is called so we know whether to go to sync BRWs or wait for more
2287          * RPCs to complete */
2288         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2289                 cli->cl_w_in_flight--;
2290         else
2291                 cli->cl_r_in_flight--;
2292
2293         async = cfs_list_empty(&aa->aa_oaps);
2294         if (!async) { /* from osc_send_oap_rpc() */
2295                 struct osc_async_page *oap, *tmp;
2296                 /* the caller may re-use the oap after the completion call so
2297                  * we need to clean it up a little */
2298                 cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
2299                                              oap_rpc_item) {
2300                         cfs_list_del_init(&oap->oap_rpc_item);
2301                         osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
2302                 }
2303                 OBDO_FREE(aa->aa_oa);
2304         } else { /* from async_internal() */
2305                 obd_count i;
2306                 for (i = 0; i < aa->aa_page_count; i++)
2307                         osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
2308         }
2309         osc_wake_cache_waiters(cli);
2310         osc_check_rpcs0(env, cli, 1);
2311         client_obd_list_unlock(&cli->cl_loi_list_lock);
2312
2313         if (!async)
2314                 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2315                                   req->rq_bulk->bd_nob_transferred);
2316         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2317         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2318
2319         RETURN(rc);
2320 }
2321
2322 static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
2323                                             struct client_obd *cli,
2324                                             cfs_list_t *rpc_list,
2325                                             int page_count, int cmd)
2326 {
2327         struct ptlrpc_request *req;
2328         struct brw_page **pga = NULL;
2329         struct osc_brw_async_args *aa;
2330         struct obdo *oa = NULL;
2331         const struct obd_async_page_ops *ops = NULL;
2332         struct osc_async_page *oap;
2333         struct osc_async_page *tmp;
2334         struct cl_req *clerq = NULL;
2335         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2336         struct ldlm_lock *lock = NULL;
2337         struct cl_req_attr crattr;
2338         int i, rc, mpflag = 0;
2339
2340         ENTRY;
2341         LASSERT(!cfs_list_empty(rpc_list));
2342
2343         if (cmd & OBD_BRW_MEMALLOC)
2344                 mpflag = cfs_memory_pressure_get_and_set();
2345
2346         memset(&crattr, 0, sizeof crattr);
2347         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2348         if (pga == NULL)
2349                 GOTO(out, req = ERR_PTR(-ENOMEM));
2350
2351         OBDO_ALLOC(oa);
2352         if (oa == NULL)
2353                 GOTO(out, req = ERR_PTR(-ENOMEM));
2354
2355         i = 0;
2356         cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
2357                 struct cl_page *page = osc_oap2cl_page(oap);
2358                 if (ops == NULL) {
2359                         ops = oap->oap_caller_ops;
2360
2361                         clerq = cl_req_alloc(env, page, crt,
2362                                              1 /* only 1-object rpcs for
2363                                                 * now */);
2364                         if (IS_ERR(clerq))
2365                                 GOTO(out, req = (void *)clerq);
2366                         lock = oap->oap_ldlm_lock;
2367                 }
2368                 pga[i] = &oap->oap_brw_page;
2369                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2370                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2371                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2372                 i++;
2373                 cl_req_page_add(env, clerq, page);
2374         }
2375
2376         /* always get the data for the obdo for the rpc */
2377         LASSERT(ops != NULL);
2378         crattr.cra_oa = oa;
2379         crattr.cra_capa = NULL;
2380         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2381         if (lock) {
2382                 oa->o_handle = lock->l_remote_handle;
2383                 oa->o_valid |= OBD_MD_FLHANDLE;
2384         }
2385
2386         rc = cl_req_prep(env, clerq);
2387         if (rc != 0) {
2388                 CERROR("cl_req_prep failed: %d\n", rc);
2389                 GOTO(out, req = ERR_PTR(rc));
2390         }
2391
2392         sort_brw_pages(pga, page_count);
2393         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2394                                   pga, &req, crattr.cra_capa, 1, 0);
2395         if (rc != 0) {
2396                 CERROR("prep_req failed: %d\n", rc);
2397                 GOTO(out, req = ERR_PTR(rc));
2398         }
2399
2400         if (cmd & OBD_BRW_MEMALLOC)
2401                 req->rq_memalloc = 1;
2402
2403         /* Need to update the timestamps after the request is built in case
2404          * we race with setattr (locally or in queue at OST).  If OST gets
2405          * later setattr before earlier BRW (as determined by the request xid),
2406          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2407          * way to do this in a single call.  bug 10150 */
2408         cl_req_attr_set(env, clerq, &crattr,
2409                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2410
2411         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2412         aa = ptlrpc_req_async_args(req);
2413         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2414         cfs_list_splice(rpc_list, &aa->aa_oaps);
2415         CFS_INIT_LIST_HEAD(rpc_list);
2416         aa->aa_clerq = clerq;
2417 out:
2418         if (cmd & OBD_BRW_MEMALLOC)
2419                 cfs_memory_pressure_restore(mpflag);
2420
2421         capa_put(crattr.cra_capa);
2422         if (IS_ERR(req)) {
2423                 if (oa)
2424                         OBDO_FREE(oa);
2425                 if (pga)
2426                         OBD_FREE(pga, sizeof(*pga) * page_count);
2427                 /* this should happen rarely and is pretty bad, it makes the
2428                  * pending list not follow the dirty order */
2429                 client_obd_list_lock(&cli->cl_loi_list_lock);
2430                 cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
2431                         cfs_list_del_init(&oap->oap_rpc_item);
2432
2433                         /* queued sync pages can be torn down while the pages
2434                          * were between the pending list and the rpc */
2435                         if (oap->oap_interrupted) {
2436                                 CDEBUG(D_INODE, "oap %p interrupted\n", oap);
2437                                 osc_ap_completion(env, cli, NULL, oap, 0,
2438                                                   oap->oap_count);
2439                                 continue;
2440                         }
2441                         osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
2442                 }
2443                 if (clerq && !IS_ERR(clerq))
2444                         cl_req_completion(env, clerq, PTR_ERR(req));
2445         }
2446         RETURN(req);
2447 }
2448
2449 /**
2450  * prepare pages for ASYNC io and put pages in send queue.
2451  *
2452  * \param cmd OBD_BRW_* macroses
2453  * \param lop pending pages
2454  *
2455  * \return zero if no page added to send queue.
2456  * \return 1 if pages successfully added to send queue.
2457  * \return negative on errors.
2458  */
2459 static int
2460 osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
2461                  struct lov_oinfo *loi, int cmd,
2462                  struct loi_oap_pages *lop, pdl_policy_t pol)
2463 {
2464         struct ptlrpc_request *req;
2465         obd_count page_count = 0;
2466         struct osc_async_page *oap = NULL, *tmp;
2467         struct osc_brw_async_args *aa;
2468         const struct obd_async_page_ops *ops;
2469         CFS_LIST_HEAD(rpc_list);
2470         int srvlock = 0, mem_tight = 0;
2471         struct cl_object *clob = NULL;
2472         obd_off starting_offset = OBD_OBJECT_EOF;
2473         unsigned int ending_offset;
2474         int starting_page_off = 0;
2475         ENTRY;
2476
2477         /* ASYNC_HP pages first. At present, when the lock the pages is
2478          * to be canceled, the pages covered by the lock will be sent out
2479          * with ASYNC_HP. We have to send out them as soon as possible. */
2480         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
2481                 if (oap->oap_async_flags & ASYNC_HP)
2482                         cfs_list_move(&oap->oap_pending_item, &rpc_list);
2483                 else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
2484                         /* only do this for writeback pages. */
2485                         cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
2486                 if (++page_count >= cli->cl_max_pages_per_rpc)
2487                         break;
2488         }
2489         cfs_list_splice_init(&rpc_list, &lop->lop_pending);
2490         page_count = 0;
2491
2492         /* first we find the pages we're allowed to work with */
2493         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
2494                                      oap_pending_item) {
2495                 ops = oap->oap_caller_ops;
2496
2497                 LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
2498                          "magic 0x%x\n", oap, oap->oap_magic);
2499
2500                 if (clob == NULL) {
2501                         /* pin object in memory, so that completion call-backs
2502                          * can be safely called under client_obd_list lock. */
2503                         clob = osc_oap2cl_page(oap)->cp_obj;
2504                         cl_object_get(clob);
2505                 }
2506
2507                 if (page_count != 0 &&
2508                     srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
2509                         CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
2510                                " oap %p, page %p, srvlock %u\n",
2511                                oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
2512                         break;
2513                 }
2514
2515                 /* If there is a gap at the start of this page, it can't merge
2516                  * with any previous page, so we'll hand the network a
2517                  * "fragmented" page array that it can't transfer in 1 RDMA */
2518                 if (oap->oap_obj_off < starting_offset) {
2519                         if (starting_page_off != 0)
2520                                 break;
2521
2522                         starting_page_off = oap->oap_page_off;
2523                         starting_offset = oap->oap_obj_off + starting_page_off;
2524                 } else if (oap->oap_page_off != 0)
2525                         break;
2526
2527                 /* in llite being 'ready' equates to the page being locked
2528                  * until completion unlocks it.  commit_write submits a page
2529                  * as not ready because its unlock will happen unconditionally
2530                  * as the call returns.  if we race with commit_write giving
2531                  * us that page we don't want to create a hole in the page
2532                  * stream, so we stop and leave the rpc to be fired by
2533                  * another dirtier or kupdated interval (the not ready page
2534                  * will still be on the dirty list).  we could call in
2535                  * at the end of ll_file_write to process the queue again. */
2536                 if (!(oap->oap_async_flags & ASYNC_READY)) {
2537                         int rc = ops->ap_make_ready(env, oap->oap_caller_data,
2538                                                     cmd);
2539                         if (rc < 0)
2540                                 CDEBUG(D_INODE, "oap %p page %p returned %d "
2541                                                 "instead of ready\n", oap,
2542                                                 oap->oap_page, rc);
2543                         switch (rc) {
2544                         case -EAGAIN:
2545                                 /* llite is telling us that the page is still
2546                                  * in commit_write and that we should try
2547                                  * and put it in an rpc again later.  we
2548                                  * break out of the loop so we don't create
2549                                  * a hole in the sequence of pages in the rpc
2550                                  * stream.*/
2551                                 oap = NULL;
2552                                 break;
2553                         case -EINTR:
2554                                 /* the io isn't needed.. tell the checks
2555                                  * below to complete the rpc with EINTR */
2556                                 cfs_spin_lock(&oap->oap_lock);
2557                                 oap->oap_async_flags |= ASYNC_COUNT_STABLE;
2558                                 cfs_spin_unlock(&oap->oap_lock);
2559                                 oap->oap_count = -EINTR;
2560                                 break;
2561                         case 0:
2562                                 cfs_spin_lock(&oap->oap_lock);
2563                                 oap->oap_async_flags |= ASYNC_READY;
2564                                 cfs_spin_unlock(&oap->oap_lock);
2565                                 break;
2566                         default:
2567                                 LASSERTF(0, "oap %p page %p returned %d "
2568                                             "from make_ready\n", oap,
2569                                             oap->oap_page, rc);
2570                                 break;
2571                         }
2572                 }
2573                 if (oap == NULL)
2574                         break;
2575
2576                 /* take the page out of our book-keeping */
2577                 cfs_list_del_init(&oap->oap_pending_item);
2578                 lop_update_pending(cli, lop, cmd, -1);
2579                 cfs_list_del_init(&oap->oap_urgent_item);
2580
2581                 /* ask the caller for the size of the io as the rpc leaves. */
2582                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
2583                         oap->oap_count =
2584                                 ops->ap_refresh_count(env, oap->oap_caller_data,
2585                                                       cmd);
2586                         LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
2587                 }
2588                 if (oap->oap_count <= 0) {
2589                         CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
2590                                oap->oap_count);
2591                         osc_ap_completion(env, cli, NULL,
2592                                           oap, 0, oap->oap_count);
2593                         continue;
2594                 }
2595
2596                 /* now put the page back in our accounting */
2597                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2598                 if (page_count++ == 0)
2599                         srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
2600
2601                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
2602                         mem_tight = 1;
2603
2604                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
2605                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
2606                  * have the same alignment as the initial writes that allocated
2607                  * extents on the server. */
2608                 ending_offset = oap->oap_obj_off + oap->oap_page_off +
2609                                 oap->oap_count;
2610                 if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
2611                         break;
2612
2613                 if (page_count >= cli->cl_max_pages_per_rpc)
2614                         break;
2615
2616                 /* If there is a gap at the end of this page, it can't merge
2617                  * with any subsequent pages, so we'll hand the network a
2618                  * "fragmented" page array that it can't transfer in 1 RDMA */
2619                 if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
2620                         break;
2621         }
2622
2623         loi_list_maint(cli, loi);
2624
2625         client_obd_list_unlock(&cli->cl_loi_list_lock);
2626
2627         if (clob != NULL)
2628                 cl_object_put(env, clob);
2629
2630         if (page_count == 0) {
2631                 client_obd_list_lock(&cli->cl_loi_list_lock);
2632                 RETURN(0);
2633         }
2634
2635         req = osc_build_req(env, cli, &rpc_list, page_count,
2636                             mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
2637         if (IS_ERR(req)) {
2638                 LASSERT(cfs_list_empty(&rpc_list));
2639                 loi_list_maint(cli, loi);
2640                 RETURN(PTR_ERR(req));
2641         }
2642
2643         aa = ptlrpc_req_async_args(req);
2644
2645         starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
2646         if (cmd == OBD_BRW_READ) {
2647                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2648                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2649                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2650                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2651         } else {
2652                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2653                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
2654                                  cli->cl_w_in_flight);
2655                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2656                                       (starting_offset >> CFS_PAGE_SHIFT) + 1);
2657         }
2658
2659         client_obd_list_lock(&cli->cl_loi_list_lock);
2660
2661         if (cmd == OBD_BRW_READ)
2662                 cli->cl_r_in_flight++;
2663         else
2664                 cli->cl_w_in_flight++;
2665
2666         /* queued sync pages can be torn down while the pages
2667          * were between the pending list and the rpc */
2668         tmp = NULL;
2669         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2670                 /* only one oap gets a request reference */
2671                 if (tmp == NULL)
2672                         tmp = oap;
2673                 if (oap->oap_interrupted && !req->rq_intr) {
2674                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2675                                oap, req);
2676                         ptlrpc_mark_interrupted(req);
2677                 }
2678         }
2679         if (tmp != NULL)
2680                 tmp->oap_request = ptlrpc_request_addref(req);
2681
2682         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2683                   page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
2684
2685         req->rq_interpret_reply = brw_interpret;
2686
2687         /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
2688          *      CPU/NUMA node the majority of pages were allocated on, and try
2689          *      to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
2690          *      to reduce cross-CPU memory traffic.
2691          *
2692          *      But on the other hand, we expect that multiple ptlrpcd threads
2693          *      and the initial write sponsor can run in parallel, especially
2694          *      when data checksum is enabled, which is CPU-bound operation and
2695          *      single ptlrpcd thread cannot process in time. So more ptlrpcd
2696          *      threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
2697          */
2698         ptlrpcd_add_req(req, pol, -1);
2699         RETURN(1);
2700 }
2701
2702 #define LOI_DEBUG(LOI, STR, args...)                                     \
2703         CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR,           \
2704                !cfs_list_empty(&(LOI)->loi_ready_item) ||                \
2705                !cfs_list_empty(&(LOI)->loi_hp_ready_item),               \
2706                (LOI)->loi_write_lop.lop_num_pending,                     \
2707                !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent),        \
2708                (LOI)->loi_read_lop.lop_num_pending,                      \
2709                !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent),         \
2710                args)                                                     \
2711
2712 /* This is called by osc_check_rpcs() to find which objects have pages that
2713  * we could be sending.  These lists are maintained by lop_makes_rpc(). */
2714 struct lov_oinfo *osc_next_loi(struct client_obd *cli)
2715 {
2716         ENTRY;
2717
2718         /* First return objects that have blocked locks so that they
2719          * will be flushed quickly and other clients can get the lock,
2720          * then objects which have pages ready to be stuffed into RPCs */
2721         if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
2722                 RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
2723                                       struct lov_oinfo, loi_hp_ready_item));
2724         if (!cfs_list_empty(&cli->cl_loi_ready_list))
2725                 RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
2726                                       struct lov_oinfo, loi_ready_item));
2727
2728         /* then if we have cache waiters, return all objects with queued
2729          * writes.  This is especially important when many small files
2730          * have filled up the cache and not been fired into rpcs because
2731          * they don't pass the nr_pending/object threshhold */
2732         if (!cfs_list_empty(&cli->cl_cache_waiters) &&
2733             !cfs_list_empty(&cli->cl_loi_write_list))
2734                 RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2735                                       struct lov_oinfo, loi_write_item));
2736
2737         /* then return all queued objects when we have an invalid import
2738          * so that they get flushed */
2739         if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
2740                 if (!cfs_list_empty(&cli->cl_loi_write_list))
2741                         RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
2742                                               struct lov_oinfo,
2743                                               loi_write_item));
2744                 if (!cfs_list_empty(&cli->cl_loi_read_list))
2745                         RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
2746                                               struct lov_oinfo, loi_read_item));
2747         }
2748         RETURN(NULL);
2749 }
2750
2751 static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
2752 {
2753         struct osc_async_page *oap;
2754         int hprpc = 0;
2755
2756         if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
2757                 oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
2758                                      struct osc_async_page, oap_urgent_item);
2759                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2760         }
2761
2762         if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
2763                 oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
2764                                      struct osc_async_page, oap_urgent_item);
2765                 hprpc = !!(oap->oap_async_flags & ASYNC_HP);
2766         }
2767
2768         return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
2769 }
2770
2771 /* called with the loi list lock held */
2772 static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
2773 {
2774         struct lov_oinfo *loi;
2775         int rc = 0, race_counter = 0;
2776         pdl_policy_t pol;
2777         ENTRY;
2778
2779         pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
2780
2781         while ((loi = osc_next_loi(cli)) != NULL) {
2782                 LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
2783
2784                 if (osc_max_rpc_in_flight(cli, loi))
2785                         break;
2786
2787                 /* attempt some read/write balancing by alternating between
2788                  * reads and writes in an object.  The makes_rpc checks here
2789                  * would be redundant if we were getting read/write work items
2790                  * instead of objects.  we don't want send_oap_rpc to drain a
2791                  * partial read pending queue when we're given this object to
2792                  * do io on writes while there are cache waiters */
2793                 if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
2794                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
2795                                               &loi->loi_write_lop, pol);
2796                         if (rc < 0) {
2797                                 CERROR("Write request failed with %d\n", rc);
2798
2799                                 /* osc_send_oap_rpc failed, mostly because of
2800                                  * memory pressure.
2801                                  *
2802                                  * It can't break here, because if:
2803                                  *  - a page was submitted by osc_io_submit, so
2804                                  *    page locked;
2805                                  *  - no request in flight
2806                                  *  - no subsequent request
2807                                  * The system will be in live-lock state,
2808                                  * because there is no chance to call
2809                                  * osc_io_unplug() and osc_check_rpcs() any
2810                                  * more. pdflush can't help in this case,
2811                                  * because it might be blocked at grabbing
2812                                  * the page lock as we mentioned.
2813                                  *
2814                                  * Anyway, continue to drain pages. */
2815                                 /* break; */
2816                         }
2817
2818                         if (rc > 0)
2819                                 race_counter = 0;
2820                         else if (rc == 0)
2821                                 race_counter++;
2822                 }
2823                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
2824                         rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
2825                                               &loi->loi_read_lop, pol);
2826                         if (rc < 0)
2827                                 CERROR("Read request failed with %d\n", rc);
2828
2829                         if (rc > 0)
2830                                 race_counter = 0;
2831                         else if (rc == 0)
2832                                 race_counter++;
2833                 }
2834
2835                 /* attempt some inter-object balancing by issuing rpcs
2836                  * for each object in turn */
2837                 if (!cfs_list_empty(&loi->loi_hp_ready_item))
2838                         cfs_list_del_init(&loi->loi_hp_ready_item);
2839                 if (!cfs_list_empty(&loi->loi_ready_item))
2840                         cfs_list_del_init(&loi->loi_ready_item);
2841                 if (!cfs_list_empty(&loi->loi_write_item))
2842                         cfs_list_del_init(&loi->loi_write_item);
2843                 if (!cfs_list_empty(&loi->loi_read_item))
2844                         cfs_list_del_init(&loi->loi_read_item);
2845
2846                 loi_list_maint(cli, loi);
2847
2848                 /* send_oap_rpc fails with 0 when make_ready tells it to
2849                  * back off.  llite's make_ready does this when it tries
2850                  * to lock a page queued for write that is already locked.
2851                  * we want to try sending rpcs from many objects, but we
2852                  * don't want to spin failing with 0.  */
2853                 if (race_counter == 10)
2854                         break;
2855         }
2856 }
2857
2858 void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
2859 {
2860         osc_check_rpcs0(env, cli, 0);
2861 }
2862
2863 /**
2864  * Non-blocking version of osc_enter_cache() that consumes grant only when it
2865  * is available.
2866  */
2867 int osc_enter_cache_try(const struct lu_env *env,
2868                         struct client_obd *cli, struct lov_oinfo *loi,
2869                         struct osc_async_page *oap, int transient)
2870 {
2871         int has_grant;
2872
2873         has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
2874         if (has_grant) {
2875                 osc_consume_write_grant(cli, &oap->oap_brw_page);
2876                 if (transient) {
2877                         cli->cl_dirty_transit += CFS_PAGE_SIZE;
2878                         cfs_atomic_inc(&obd_dirty_transit_pages);
2879                         oap->oap_brw_flags |= OBD_BRW_NOCACHE;
2880                 }
2881         }
2882         return has_grant;
2883 }
2884
2885 /* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
2886  * grant or cache space. */
2887 static int osc_enter_cache(const struct lu_env *env,
2888                            struct client_obd *cli, struct lov_oinfo *loi,
2889                            struct osc_async_page *oap)
2890 {
2891         struct osc_cache_waiter ocw;
2892         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2893         int rc = -EDQUOT;
2894         ENTRY;
2895
2896         CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
2897                "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
2898                cli->cl_dirty_max, obd_max_dirty_pages,
2899                cli->cl_lost_grant, cli->cl_avail_grant);
2900
2901         /* force the caller to try sync io.  this can jump the list
2902          * of queued writes and create a discontiguous rpc stream */
2903         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
2904             cli->cl_dirty_max < CFS_PAGE_SIZE     ||
2905             cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
2906                 RETURN(-EDQUOT);
2907
2908         /* Hopefully normal case - cache space and write credits available */
2909         if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
2910             cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
2911             osc_enter_cache_try(env, cli, loi, oap, 0))
2912                 RETURN(0);
2913
2914         /* We can get here for two reasons: too many dirty pages in cache, or
2915          * run out of grants. In both cases we should write dirty pages out.
2916          * Adding a cache waiter will trigger urgent write-out no matter what
2917          * RPC size will be.
2918          * The exiting condition is no avail grants and no dirty pages caching,
2919          * that really means there is no space on the OST. */
2920         cfs_waitq_init(&ocw.ocw_waitq);
2921         ocw.ocw_oap = oap;
2922         while (cli->cl_dirty > 0) {
2923                 cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
2924                 ocw.ocw_rc = 0;
2925
2926                 loi_list_maint(cli, loi);
2927                 osc_check_rpcs(env, cli);
2928                 client_obd_list_unlock(&cli->cl_loi_list_lock);
2929
2930                 CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
2931                        cli->cl_import->imp_obd->obd_name, &ocw, oap);
2932
2933                 rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
2934
2935                 client_obd_list_lock(&cli->cl_loi_list_lock);
2936                 cfs_list_del_init(&ocw.ocw_entry);
2937                 if (rc < 0)
2938                         break;
2939
2940                 rc = ocw.ocw_rc;
2941                 if (rc != -EDQUOT)
2942                         break;
2943         }
2944
2945         RETURN(rc);
2946 }
2947
2948
2949 int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
2950                         struct lov_oinfo *loi, cfs_page_t *page,
2951                         obd_off offset, const struct obd_async_page_ops *ops,
2952                         void *data, void **res, int nocache,
2953                         struct lustre_handle *lockh)
2954 {
2955         struct osc_async_page *oap;
2956
2957         ENTRY;
2958
2959         if (!page)
2960                 return cfs_size_round(sizeof(*oap));
2961
2962         oap = *res;
2963         oap->oap_magic = OAP_MAGIC;
2964         oap->oap_cli = &exp->exp_obd->u.cli;
2965         oap->oap_loi = loi;
2966
2967         oap->oap_caller_ops = ops;
2968         oap->oap_caller_data = data;
2969
2970         oap->oap_page = page;
2971         oap->oap_obj_off = offset;
2972         if (!client_is_remote(exp) &&
2973             cfs_capable(CFS_CAP_SYS_RESOURCE))
2974                 oap->oap_brw_flags = OBD_BRW_NOQUOTA;
2975
2976         LASSERT(!(offset & ~CFS_PAGE_MASK));
2977
2978         CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
2979         CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
2980         CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
2981         CFS_INIT_LIST_HEAD(&oap->oap_page_list);
2982
2983         cfs_spin_lock_init(&oap->oap_lock);
2984         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
2985         RETURN(0);
2986 }
2987
2988 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
2989                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
2990                        struct osc_async_page *oap, int cmd, int off,
2991                        int count, obd_flag brw_flags, enum async_flags async_flags)
2992 {
2993         struct client_obd *cli = &exp->exp_obd->u.cli;
2994         int rc = 0;
2995         ENTRY;
2996
2997         if (oap->oap_magic != OAP_MAGIC)
2998                 RETURN(-EINVAL);
2999
3000         if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
3001                 RETURN(-EIO);
3002
3003         if (!cfs_list_empty(&oap->oap_pending_item) ||
3004             !cfs_list_empty(&oap->oap_urgent_item) ||
3005             !cfs_list_empty(&oap->oap_rpc_item))
3006                 RETURN(-EBUSY);
3007
3008         /* check if the file's owner/group is over quota */
3009         if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
3010                 struct cl_object *obj;
3011                 struct cl_attr    attr; /* XXX put attr into thread info */
3012                 unsigned int qid[MAXQUOTAS];
3013
3014                 obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
3015
3016                 cl_object_attr_lock(obj);
3017                 rc = cl_object_attr_get(env, obj, &attr);
3018                 cl_object_attr_unlock(obj);
3019
3020                 qid[USRQUOTA] = attr.cat_uid;
3021                 qid[GRPQUOTA] = attr.cat_gid;
3022                 if (rc == 0 &&
3023                     osc_quota_chkdq(cli, qid) == NO_QUOTA)
3024                         rc = -EDQUOT;
3025                 if (rc)
3026                         RETURN(rc);
3027         }
3028
3029         if (loi == NULL)
3030                 loi = lsm->lsm_oinfo[0];
3031
3032         client_obd_list_lock(&cli->cl_loi_list_lock);
3033
3034         LASSERT(off + count <= CFS_PAGE_SIZE);
3035         oap->oap_cmd = cmd;
3036         oap->oap_page_off = off;
3037         oap->oap_count = count;
3038         oap->oap_brw_flags = brw_flags;
3039         /* Give a hint to OST that requests are coming from kswapd - bug19529 */
3040         if (cfs_memory_pressure_get())
3041                 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
3042         cfs_spin_lock(&oap->oap_lock);
3043         oap->oap_async_flags = async_flags;
3044         cfs_spin_unlock(&oap->oap_lock);
3045
3046         if (cmd & OBD_BRW_WRITE) {
3047                 rc = osc_enter_cache(env, cli, loi, oap);
3048                 if (rc) {
3049                         client_obd_list_unlock(&cli->cl_loi_list_lock);
3050                         RETURN(rc);
3051                 }
3052         }
3053
3054         LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
3055                   cmd);
3056
3057         osc_oap_to_pending(oap);
3058         loi_list_maint(cli, loi);
3059         if (!osc_max_rpc_in_flight(cli, loi) &&
3060             lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
3061                 LASSERT(cli->cl_writeback_work != NULL);
3062                 rc = ptlrpcd_queue_work(cli->cl_writeback_work);
3063
3064                 CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
3065                        cli, rc);
3066         }
3067         client_obd_list_unlock(&cli->cl_loi_list_lock);
3068
3069         RETURN(0);
3070 }
3071
3072 /* aka (~was & now & flag), but this is more clear :) */
3073 #define SETTING(was, now, flag) (!(was & flag) && (now & flag))
3074
3075 int osc_set_async_flags_base(struct client_obd *cli,
3076                              struct lov_oinfo *loi, struct osc_async_page *oap,
3077                              obd_flag async_flags)
3078 {
3079         struct loi_oap_pages *lop;
3080         int flags = 0;
3081         ENTRY;
3082
3083         LASSERT(!cfs_list_empty(&oap->oap_pending_item));
3084
3085         if (oap->oap_cmd & OBD_BRW_WRITE) {
3086                 lop = &loi->loi_write_lop;
3087         } else {
3088                 lop = &loi->loi_read_lop;
3089         }
3090
3091         if ((oap->oap_async_flags & async_flags) == async_flags)
3092                 RETURN(0);
3093
3094         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
3095                 flags |= ASYNC_READY;
3096
3097         if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
3098             cfs_list_empty(&oap->oap_rpc_item)) {
3099                 if (oap->oap_async_flags & ASYNC_HP)
3100                         cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
3101                 else
3102                         cfs_list_add_tail(&oap->oap_urgent_item,
3103                                           &lop->lop_urgent);
3104                 flags |= ASYNC_URGENT;
3105                 loi_list_maint(cli, loi);
3106         }
3107         cfs_spin_lock(&oap->oap_lock);
3108         oap->oap_async_flags |= flags;
3109         cfs_spin_unlock(&oap->oap_lock);
3110
3111         LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
3112                         oap->oap_async_flags);
3113         RETURN(0);
3114 }
3115
3116 int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
3117                             struct lov_oinfo *loi, struct osc_async_page *oap)
3118 {
3119         struct client_obd *cli = &exp->exp_obd->u.cli;
3120         struct loi_oap_pages *lop;
3121         int rc = 0;
3122         ENTRY;
3123
3124         if (oap->oap_magic != OAP_MAGIC)
3125                 RETURN(-EINVAL);
3126
3127         if (loi == NULL)
3128                 loi = lsm->lsm_oinfo[0];
3129
3130         if (oap->oap_cmd & OBD_BRW_WRITE) {
3131                 lop = &loi->loi_write_lop;
3132         } else {
3133                 lop = &loi->loi_read_lop;
3134         }
3135
3136         client_obd_list_lock(&cli->cl_loi_list_lock);
3137
3138         if (!cfs_list_empty(&oap->oap_rpc_item))
3139                 GOTO(out, rc = -EBUSY);
3140
3141         osc_exit_cache(cli, oap, 0);
3142         osc_wake_cache_waiters(cli);
3143
3144         if (!cfs_list_empty(&oap->oap_urgent_item)) {
3145                 cfs_list_del_init(&oap->oap_urgent_item);
3146                 cfs_spin_lock(&oap->oap_lock);
3147                 oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
3148                 cfs_spin_unlock(&oap->oap_lock);
3149         }
3150         if (!cfs_list_empty(&oap->oap_pending_item)) {
3151                 cfs_list_del_init(&oap->oap_pending_item);
3152                 lop_update_pending(cli, lop, oap->oap_cmd, -1);
3153         }
3154         loi_list_maint(cli, loi);
3155         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
3156 out:
3157         client_obd_list_unlock(&cli->cl_loi_list_lock);
3158         RETURN(rc);
3159 }
3160
3161 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
3162                                         struct ldlm_enqueue_info *einfo)
3163 {
3164         void *data = einfo->ei_cbdata;
3165         int set = 0;
3166
3167         LASSERT(lock != NULL);
3168         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
3169         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
3170         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
3171         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
3172
3173         lock_res_and_lock(lock);
3174         cfs_spin_lock(&osc_ast_guard);
3175
3176         if (lock->l_ast_data == NULL)
3177                 lock->l_ast_data = data;
3178         if (lock->l_ast_data == data)
3179                 set = 1;
3180
3181         cfs_spin_unlock(&osc_ast_guard);
3182         unlock_res_and_lock(lock);
3183
3184         return set;
3185 }
3186
3187 static int osc_set_data_with_check(struct lustre_handle *lockh,
3188                                    struct ldlm_enqueue_info *einfo)
3189 {
3190         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
3191         int set = 0;
3192
3193         if (lock != NULL) {
3194                 set = osc_set_lock_data_with_check(lock, einfo);
3195                 LDLM_LOCK_PUT(lock);
3196         } else
3197                 CERROR("lockh %p, data %p - client evicted?\n",
3198                        lockh, einfo->ei_cbdata);
3199         return set;
3200 }
3201
3202 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3203                              ldlm_iterator_t replace, void *data)
3204 {
3205         struct ldlm_res_id res_id;
3206         struct obd_device *obd = class_exp2obd(exp);
3207
3208         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3209         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3210         return 0;
3211 }
3212
3213 /* find any ldlm lock of the inode in osc
3214  * return 0    not find
3215  *        1    find one
3216  *      < 0    error */
3217 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
3218                            ldlm_iterator_t replace, void *data)
3219 {
3220         struct ldlm_res_id res_id;
3221         struct obd_device *obd = class_exp2obd(exp);
3222         int rc = 0;
3223
3224         osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
3225         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
3226         if (rc == LDLM_ITER_STOP)
3227                 return(1);
3228         if (rc == LDLM_ITER_CONTINUE)
3229                 return(0);
3230         return(rc);
3231 }
3232
3233 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
3234                             obd_enqueue_update_f upcall, void *cookie,
3235                             int *flags, int agl, int rc)
3236 {
3237         int intent = *flags & LDLM_FL_HAS_INTENT;
3238         ENTRY;
3239
3240         if (intent) {
3241                 /* The request was created before ldlm_cli_enqueue call. */
3242                 if (rc == ELDLM_LOCK_ABORTED) {
3243                         struct ldlm_reply *rep;
3244                         rep = req_capsule_server_get(&req->rq_pill,
3245                                                      &RMF_DLM_REP);
3246
3247                         LASSERT(rep != NULL);
3248                         if (rep->lock_policy_res1)
3249                                 rc = rep->lock_policy_res1;
3250                 }
3251         }
3252
3253         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
3254             (rc == 0)) {
3255                 *flags |= LDLM_FL_LVB_READY;
3256                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
3257                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
3258         }
3259
3260         /* Call the update callback. */
3261         rc = (*upcall)(cookie, rc);
3262         RETURN(rc);
3263 }
3264
3265 static int osc_enqueue_interpret(const struct lu_env *env,
3266                                  struct ptlrpc_request *req,
3267                                  struct osc_enqueue_args *aa, int rc)
3268 {
3269         struct ldlm_lock *lock;
3270         struct lustre_handle handle;
3271         __u32 mode;
3272         struct ost_lvb *lvb;
3273         __u32 lvb_len;
3274         int *flags = aa->oa_flags;
3275
3276         /* Make a local copy of a lock handle and a mode, because aa->oa_*
3277          * might be freed anytime after lock upcall has been called. */
3278         lustre_handle_copy(&handle, aa->oa_lockh);
3279         mode = aa->oa_ei->ei_mode;
3280
3281         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
3282          * be valid. */
3283         lock = ldlm_handle2lock(&handle);
3284
3285         /* Take an additional reference so that a blocking AST that
3286          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
3287          * to arrive after an upcall has been executed by
3288          * osc_enqueue_fini(). */
3289         ldlm_lock_addref(&handle, mode);
3290
3291         /* Let CP AST to grant the lock first. */
3292         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
3293
3294         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
3295                 lvb = NULL;
3296                 lvb_len = 0;
3297         } else {
3298                 lvb = aa->oa_lvb;
3299                 lvb_len = sizeof(*aa->oa_lvb);
3300         }
3301
3302         /* Complete obtaining the lock procedure. */
3303         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
3304                                    mode, flags, lvb, lvb_len, &handle, rc);
3305         /* Complete osc stuff. */
3306         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
3307                               flags, aa->oa_agl, rc);
3308
3309         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
3310
3311         /* Release the lock for async request. */
3312         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
3313                 /*
3314                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
3315                  * not already released by
3316                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
3317                  */
3318                 ldlm_lock_decref(&handle, mode);
3319
3320         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
3321                  aa->oa_lockh, req, aa);
3322         ldlm_lock_decref(&handle, mode);
3323         LDLM_LOCK_PUT(lock);
3324         return rc;
3325 }
3326
3327 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
3328                         struct lov_oinfo *loi, int flags,
3329                         struct ost_lvb *lvb, __u32 mode, int rc)
3330 {
3331         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
3332
3333         if (rc == ELDLM_OK) {
3334                 __u64 tmp;
3335
3336                 LASSERT(lock != NULL);
3337                 loi->loi_lvb = *lvb;
3338                 tmp = loi->loi_lvb.lvb_size;
3339                 /* Extend KMS up to the end of this lock and no further
3340                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
3341                 if (tmp > lock->l_policy_data.l_extent.end)
3342                         tmp = lock->l_policy_data.l_extent.end + 1;
3343                 if (tmp >= loi->loi_kms) {
3344                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
3345                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
3346                         loi_kms_set(loi, tmp);
3347                 } else {
3348                         LDLM_DEBUG(lock, "lock acquired, setting rss="
3349                                    LPU64"; leaving kms="LPU64", end="LPU64,
3350                                    loi->loi_lvb.lvb_size, loi->loi_kms,
3351                                    lock->l_policy_data.l_extent.end);
3352                 }
3353                 ldlm_lock_allow_match(lock);
3354         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
3355                 LASSERT(lock != NULL);
3356                 loi->loi_lvb = *lvb;
3357                 ldlm_lock_allow_match(lock);
3358                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
3359                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
3360                 rc = ELDLM_OK;
3361         }
3362
3363         if (lock != NULL) {
3364                 if (rc != ELDLM_OK)
3365                         ldlm_lock_fail_match(lock);
3366
3367                 LDLM_LOCK_PUT(lock);
3368         }
3369 }
3370 EXPORT_SYMBOL(osc_update_enqueue);
3371
3372 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
3373
3374 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
3375  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
3376  * other synchronous requests, however keeping some locks and trying to obtain
3377  * others may take a considerable amount of time in a case of ost failure; and
3378  * when other sync requests do not get released lock from a client, the client
3379  * is excluded from the cluster -- such scenarious make the life difficult, so
3380  * release locks just after they are obtained. */
3381 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3382                      int *flags, ldlm_policy_data_t *policy,
3383                      struct ost_lvb *lvb, int kms_valid,
3384                      obd_enqueue_update_f upcall, void *cookie,
3385                      struct ldlm_enqueue_info *einfo,
3386                      struct lustre_handle *lockh,
3387                      struct ptlrpc_request_set *rqset, int async, int agl)
3388 {
3389         struct obd_device *obd = exp->exp_obd;
3390         struct ptlrpc_request *req = NULL;
3391         int intent = *flags & LDLM_FL_HAS_INTENT;
3392         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
3393         ldlm_mode_t mode;
3394         int rc;
3395         ENTRY;
3396
3397         /* Filesystem lock extents are extended to page boundaries so that
3398          * dealing with the page cache is a little smoother.  */
3399         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3400         policy->l_extent.end |= ~CFS_PAGE_MASK;
3401
3402         /*
3403          * kms is not valid when either object is completely fresh (so that no
3404          * locks are cached), or object was evicted. In the latter case cached
3405          * lock cannot be used, because it would prime inode state with
3406          * potentially stale LVB.
3407          */
3408         if (!kms_valid)
3409                 goto no_match;
3410
3411         /* Next, search for already existing extent locks that will cover us */
3412         /* If we're trying to read, we also search for an existing PW lock.  The
3413          * VFS and page cache already protect us locally, so lots of readers/
3414          * writers can share a single PW lock.
3415          *
3416          * There are problems with conversion deadlocks, so instead of
3417          * converting a read lock to a write lock, we'll just enqueue a new
3418          * one.
3419          *
3420          * At some point we should cancel the read lock instead of making them
3421          * send us a blocking callback, but there are problems with canceling
3422          * locks out from other users right now, too. */
3423         mode = einfo->ei_mode;
3424         if (einfo->ei_mode == LCK_PR)
3425                 mode |= LCK_PW;
3426         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
3427                                einfo->ei_type, policy, mode, lockh, 0);
3428         if (mode) {
3429                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
3430
3431                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
3432                         /* For AGL, if enqueue RPC is sent but the lock is not
3433                          * granted, then skip to process this strpe.
3434                          * Return -ECANCELED to tell the caller. */
3435                         ldlm_lock_decref(lockh, mode);
3436                         LDLM_LOCK_PUT(matched);
3437                         RETURN(-ECANCELED);
3438                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
3439                         *flags |= LDLM_FL_LVB_READY;
3440                         /* addref the lock only if not async requests and PW
3441                          * lock is matched whereas we asked for PR. */
3442                         if (!rqset && einfo->ei_mode != mode)
3443                                 ldlm_lock_addref(lockh, LCK_PR);
3444                         if (intent) {
3445                                 /* I would like to be able to ASSERT here that
3446                                  * rss <= kms, but I can't, for reasons which
3447                                  * are explained in lov_enqueue() */
3448                         }
3449
3450                         /* We already have a lock, and it's referenced */
3451                         (*upcall)(cookie, ELDLM_OK);
3452
3453                         if (einfo->ei_mode != mode)
3454                                 ldlm_lock_decref(lockh, LCK_PW);
3455                         else if (rqset)
3456                                 /* For async requests, decref the lock. */
3457                                 ldlm_lock_decref(lockh, einfo->ei_mode);
3458                         LDLM_LOCK_PUT(matched);
3459                         RETURN(ELDLM_OK);
3460                 } else {
3461                         ldlm_lock_decref(lockh, mode);
3462                         LDLM_LOCK_PUT(matched);
3463                 }
3464         }
3465
3466  no_match:
3467         if (intent) {
3468                 CFS_LIST_HEAD(cancels);
3469                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3470                                            &RQF_LDLM_ENQUEUE_LVB);
3471                 if (req == NULL)
3472                         RETURN(-ENOMEM);
3473
3474                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
3475                 if (rc) {
3476                         ptlrpc_request_free(req);
3477                         RETURN(rc);
3478                 }
3479
3480                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
3481                                      sizeof *lvb);
3482                 ptlrpc_request_set_replen(req);
3483         }
3484
3485         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
3486         *flags &= ~LDLM_FL_BLOCK_GRANTED;
3487
3488         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
3489                               sizeof(*lvb), lockh, async);
3490         if (rqset) {
3491                 if (!rc) {
3492                         struct osc_enqueue_args *aa;
3493                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3494                         aa = ptlrpc_req_async_args(req);
3495                         aa->oa_ei = einfo;
3496                         aa->oa_exp = exp;
3497                         aa->oa_flags  = flags;
3498                         aa->oa_upcall = upcall;
3499                         aa->oa_cookie = cookie;
3500                         aa->oa_lvb    = lvb;
3501                         aa->oa_lockh  = lockh;
3502                         aa->oa_agl    = !!agl;
3503
3504                         req->rq_interpret_reply =
3505                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
3506                         if (rqset == PTLRPCD_SET)
3507                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3508                         else
3509                                 ptlrpc_set_add_req(rqset, req);
3510                 } else if (intent) {
3511                         ptlrpc_req_finished(req);
3512                 }
3513                 RETURN(rc);
3514         }
3515
3516         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
3517         if (intent)
3518                 ptlrpc_req_finished(req);
3519
3520         RETURN(rc);
3521 }
3522
3523 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
3524                        struct ldlm_enqueue_info *einfo,
3525                        struct ptlrpc_request_set *rqset)
3526 {
3527         struct ldlm_res_id res_id;
3528         int rc;
3529         ENTRY;
3530
3531         osc_build_res_name(oinfo->oi_md->lsm_object_id,
3532                            oinfo->oi_md->lsm_object_seq, &res_id);
3533
3534         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
3535                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
3536                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
3537                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
3538                               rqset, rqset != NULL, 0);
3539         RETURN(rc);
3540 }
3541
3542 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
3543                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
3544                    int *flags, void *data, struct lustre_handle *lockh,
3545                    int unref)
3546 {
3547         struct obd_device *obd = exp->exp_obd;
3548         int lflags = *flags;
3549         ldlm_mode_t rc;
3550         ENTRY;
3551
3552         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
3553                 RETURN(-EIO);
3554
3555         /* Filesystem lock extents are extended to page boundaries so that
3556          * dealing with the page cache is a little smoother */
3557         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
3558         policy->l_extent.end |= ~CFS_PAGE_MASK;
3559
3560         /* Next, search for already existing extent locks that will cover us */
3561         /* If we're trying to read, we also search for an existing PW lock.  The
3562          * VFS and page cache already protect us locally, so lots of readers/
3563          * writers can share a single PW lock. */
3564         rc = mode;
3565         if (mode == LCK_PR)
3566                 rc |= LCK_PW;
3567         rc = ldlm_lock_match(obd->obd_namespace, lflags,
3568                              res_id, type, policy, rc, lockh, unref);
3569         if (rc) {
3570                 if (data != NULL) {
3571                         if (!osc_set_data_with_check(lockh, data)) {
3572                                 if (!(lflags & LDLM_FL_TEST_LOCK))
3573                                         ldlm_lock_decref(lockh, rc);
3574                                 RETURN(0);
3575                         }
3576                 }
3577                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
3578                         ldlm_lock_addref(lockh, LCK_PR);
3579                         ldlm_lock_decref(lockh, LCK_PW);
3580                 }
3581                 RETURN(rc);
3582         }
3583         RETURN(rc);
3584 }
3585
3586 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
3587 {
3588         ENTRY;
3589
3590         if (unlikely(mode == LCK_GROUP))
3591                 ldlm_lock_decref_and_cancel(lockh, mode);
3592         else
3593                 ldlm_lock_decref(lockh, mode);
3594
3595         RETURN(0);
3596 }
3597
3598 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
3599                       __u32 mode, struct lustre_handle *lockh)
3600 {
3601         ENTRY;
3602         RETURN(osc_cancel_base(lockh, mode));
3603 }
3604
3605 static int osc_cancel_unused(struct obd_export *exp,
3606                              struct lov_stripe_md *lsm,
3607                              ldlm_cancel_flags_t flags,
3608                              void *opaque)
3609 {
3610         struct obd_device *obd = class_exp2obd(exp);
3611         struct ldlm_res_id res_id, *resp = NULL;
3612
3613         if (lsm != NULL) {
3614                 resp = osc_build_res_name(lsm->lsm_object_id,
3615                                           lsm->lsm_object_seq, &res_id);
3616         }
3617
3618         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
3619 }
3620
3621 static int osc_statfs_interpret(const struct lu_env *env,
3622                                 struct ptlrpc_request *req,
3623                                 struct osc_async_args *aa, int rc)
3624 {
3625         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
3626         struct obd_statfs *msfs;
3627         __u64 used;
3628         ENTRY;
3629
3630         if (rc == -EBADR)
3631                 /* The request has in fact never been sent
3632                  * due to issues at a higher level (LOV).
3633                  * Exit immediately since the caller is
3634                  * aware of the problem and takes care
3635                  * of the clean up */
3636                  RETURN(rc);
3637
3638         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
3639             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
3640                 GOTO(out, rc = 0);
3641
3642         if (rc != 0)
3643                 GOTO(out, rc);
3644
3645         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3646         if (msfs == NULL) {
3647                 GOTO(out, rc = -EPROTO);
3648         }
3649
3650         /* Reinitialize the RDONLY and DEGRADED flags at the client
3651          * on each statfs, so they don't stay set permanently. */
3652         cfs_spin_lock(&cli->cl_oscc.oscc_lock);
3653
3654         if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
3655                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
3656         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
3657                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
3658
3659         if (unlikely(msfs->os_state & OS_STATE_READONLY))
3660                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
3661         else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
3662                 cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
3663
3664         /* Add a bit of hysteresis so this flag isn't continually flapping,
3665          * and ensure that new files don't get extremely fragmented due to
3666          * only a small amount of available space in the filesystem.
3667          * We want to set the NOSPC flag when there is less than ~0.1% free
3668          * and clear it when there is at least ~0.2% free space, so:
3669          *                   avail < ~0.1% max          max = avail + used
3670          *            1025 * avail < avail + used       used = blocks - free
3671          *            1024 * avail < used
3672          *            1024 * avail < blocks - free
3673          *                   avail < ((blocks - free) >> 10)
3674          *
3675          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
3676          * lose that amount of space so in those cases we report no space left
3677          * if their is less than 1 GB left.                             */
3678         used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
3679         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
3680                      ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
3681                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
3682         else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3683                           (msfs->os_ffree > 64) &&
3684                           (msfs->os_bavail > (used << 1)))) {
3685                 cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
3686                                              OSCC_FLAG_NOSPC_BLK);
3687         }
3688
3689         if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
3690                      (msfs->os_bavail < used)))
3691                 cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
3692
3693         cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
3694
3695         *aa->aa_oi->oi_osfs = *msfs;
3696 out:
3697         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
3698         RETURN(rc);
3699 }
3700
3701 static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
3702                             __u64 max_age, struct ptlrpc_request_set *rqset)
3703 {
3704         struct ptlrpc_request *req;
3705         struct osc_async_args *aa;
3706         int                    rc;
3707         ENTRY;
3708
3709         /* We could possibly pass max_age in the request (as an absolute
3710          * timestamp or a "seconds.usec ago") so the target can avoid doing
3711          * extra calls into the filesystem if that isn't necessary (e.g.
3712          * during mount that would help a bit).  Having relative timestamps
3713          * is not so great if request processing is slow, while absolute
3714          * timestamps are not ideal because they need time synchronization. */
3715         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
3716         if (req == NULL)
3717                 RETURN(-ENOMEM);
3718
3719         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3720         if (rc) {
3721                 ptlrpc_request_free(req);
3722                 RETURN(rc);
3723         }
3724         ptlrpc_request_set_replen(req);
3725         req->rq_request_portal = OST_CREATE_PORTAL;
3726         ptlrpc_at_set_req_timeout(req);
3727
3728         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
3729                 /* procfs requests not want stat in wait for avoid deadlock */
3730                 req->rq_no_resend = 1;
3731                 req->rq_no_delay = 1;
3732         }
3733
3734         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
3735         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
3736         aa = ptlrpc_req_async_args(req);
3737         aa->aa_oi = oinfo;
3738
3739         ptlrpc_set_add_req(rqset, req);
3740         RETURN(0);
3741 }
3742
3743 static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
3744                       __u64 max_age, __u32 flags)
3745 {
3746         struct obd_statfs     *msfs;
3747         struct ptlrpc_request *req;
3748         struct obd_import     *imp = NULL;
3749         int rc;
3750         ENTRY;
3751
3752         /*Since the request might also come from lprocfs, so we need
3753          *sync this with client_disconnect_export Bug15684*/
3754         cfs_down_read(&obd->u.cli.cl_sem);
3755         if (obd->u.cli.cl_import)
3756                 imp = class_import_get(obd->u.cli.cl_import);
3757         cfs_up_read(&obd->u.cli.cl_sem);
3758         if (!imp)
3759                 RETURN(-ENODEV);
3760
3761         /* We could possibly pass max_age in the request (as an absolute
3762          * timestamp or a "seconds.usec ago") so the target can avoid doing
3763          * extra calls into the filesystem if that isn't necessary (e.g.
3764          * during mount that would help a bit).  Having relative timestamps
3765          * is not so great if request processing is slow, while absolute
3766          * timestamps are not ideal because they need time synchronization. */
3767         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
3768
3769         class_import_put(imp);
3770
3771         if (req == NULL)
3772                 RETURN(-ENOMEM);
3773
3774         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
3775         if (rc) {
3776                 ptlrpc_request_free(req);
3777                 RETURN(rc);
3778         }
3779         ptlrpc_request_set_replen(req);
3780         req->rq_request_portal = OST_CREATE_PORTAL;
3781         ptlrpc_at_set_req_timeout(req);
3782
3783         if (flags & OBD_STATFS_NODELAY) {
3784                 /* procfs requests not want stat in wait for avoid deadlock */
3785                 req->rq_no_resend = 1;
3786                 req->rq_no_delay = 1;
3787         }
3788
3789         rc = ptlrpc_queue_wait(req);
3790         if (rc)
3791                 GOTO(out, rc);
3792
3793         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
3794         if (msfs == NULL) {
3795                 GOTO(out, rc = -EPROTO);
3796         }
3797
3798         *osfs = *msfs;
3799
3800         EXIT;
3801  out:
3802         ptlrpc_req_finished(req);
3803         return rc;
3804 }
3805
3806 /* Retrieve object striping information.
3807  *
3808  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
3809  * the maximum number of OST indices which will fit in the user buffer.
3810  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
3811  */
3812 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
3813 {
3814         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
3815         struct lov_user_md_v3 lum, *lumk;
3816         struct lov_user_ost_data_v1 *lmm_objects;
3817         int rc = 0, lum_size;
3818         ENTRY;
3819
3820         if (!lsm)
3821                 RETURN(-ENODATA);
3822
3823         /* we only need the header part from user space to get lmm_magic and
3824          * lmm_stripe_count, (the header part is common to v1 and v3) */
3825         lum_size = sizeof(struct lov_user_md_v1);
3826         if (cfs_copy_from_user(&lum, lump, lum_size))
3827                 RETURN(-EFAULT);
3828
3829         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
3830             (lum.lmm_magic != LOV_USER_MAGIC_V3))
3831                 RETURN(-EINVAL);
3832
3833         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
3834         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
3835         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
3836         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
3837
3838         /* we can use lov_mds_md_size() to compute lum_size
3839          * because lov_user_md_vX and lov_mds_md_vX have the same size */
3840         if (lum.lmm_stripe_count > 0) {
3841                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
3842                 OBD_ALLOC(lumk, lum_size);
3843                 if (!lumk)
3844                         RETURN(-ENOMEM);
3845
3846                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
3847                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
3848                 else
3849                         lmm_objects = &(lumk->lmm_objects[0]);
3850                 lmm_objects->l_object_id = lsm->lsm_object_id;
3851         } else {
3852                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
3853                 lumk = &lum;
3854         }
3855
3856         lumk->lmm_object_id = lsm->lsm_object_id;
3857         lumk->lmm_object_seq = lsm->lsm_object_seq;
3858         lumk->lmm_stripe_count = 1;
3859
3860         if (cfs_copy_to_user(lump, lumk, lum_size))
3861                 rc = -EFAULT;
3862
3863         if (lumk != &lum)
3864                 OBD_FREE(lumk, lum_size);
3865
3866         RETURN(rc);
3867 }
3868
3869
3870 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3871                          void *karg, void *uarg)
3872 {
3873         struct obd_device *obd = exp->exp_obd;
3874         struct obd_ioctl_data *data = karg;
3875         int err = 0;
3876         ENTRY;
3877
3878         if (!cfs_try_module_get(THIS_MODULE)) {
3879                 CERROR("Can't get module. Is it alive?");
3880                 return -EINVAL;
3881         }
3882         switch (cmd) {
3883         case OBD_IOC_LOV_GET_CONFIG: {
3884                 char *buf;
3885                 struct lov_desc *desc;
3886                 struct obd_uuid uuid;
3887
3888                 buf = NULL;
3889                 len = 0;
3890                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
3891                         GOTO(out, err = -EINVAL);
3892
3893                 data = (struct obd_ioctl_data *)buf;
3894
3895                 if (sizeof(*desc) > data->ioc_inllen1) {
3896                         obd_ioctl_freedata(buf, len);
3897                         GOTO(out, err = -EINVAL);
3898                 }
3899
3900                 if (data->ioc_inllen2 < sizeof(uuid)) {
3901                         obd_ioctl_freedata(buf, len);
3902                         GOTO(out, err = -EINVAL);
3903                 }
3904
3905                 desc = (struct lov_desc *)data->ioc_inlbuf1;
3906                 desc->ld_tgt_count = 1;
3907                 desc->ld_active_tgt_count = 1;
3908                 desc->ld_default_stripe_count = 1;
3909                 desc->ld_default_stripe_size = 0;
3910                 desc->ld_default_stripe_offset = 0;
3911                 desc->ld_pattern = 0;
3912                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
3913
3914                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
3915
3916                 err = cfs_copy_to_user((void *)uarg, buf, len);
3917                 if (err)
3918                         err = -EFAULT;
3919                 obd_ioctl_freedata(buf, len);
3920                 GOTO(out, err);
3921         }
3922         case LL_IOC_LOV_SETSTRIPE:
3923                 err = obd_alloc_memmd(exp, karg);
3924                 if (err > 0)
3925                         err = 0;
3926                 GOTO(out, err);
3927         case LL_IOC_LOV_GETSTRIPE:
3928                 err = osc_getstripe(karg, uarg);
3929                 GOTO(out, err);
3930         case OBD_IOC_CLIENT_RECOVER:
3931                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
3932                                             data->ioc_inlbuf1, 0);
3933                 if (err > 0)
3934                         err = 0;
3935                 GOTO(out, err);
3936         case IOC_OSC_SET_ACTIVE:
3937                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
3938                                                data->ioc_offset);
3939                 GOTO(out, err);
3940         case OBD_IOC_POLL_QUOTACHECK:
3941                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3942                 GOTO(out, err);
3943         case OBD_IOC_PING_TARGET:
3944                 err = ptlrpc_obd_ping(obd);
3945                 GOTO(out, err);
3946         default:
3947                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3948                        cmd, cfs_curproc_comm());
3949                 GOTO(out, err = -ENOTTY);
3950         }
3951 out:
3952         cfs_module_put(THIS_MODULE);
3953         return err;
3954 }
3955
3956 static int osc_get_info(struct obd_export *exp, obd_count keylen,
3957                         void *key, __u32 *vallen, void *val,
3958                         struct lov_stripe_md *lsm)
3959 {
3960         ENTRY;
3961         if (!vallen || !val)
3962                 RETURN(-EFAULT);
3963
3964         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3965                 __u32 *stripe = val;
3966                 *vallen = sizeof(*stripe);
3967                 *stripe = 0;
3968                 RETURN(0);
3969         } else if (KEY_IS(KEY_LAST_ID)) {
3970                 struct ptlrpc_request *req;
3971                 obd_id                *reply;
3972                 char                  *tmp;
3973                 int                    rc;
3974
3975                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3976                                            &RQF_OST_GET_INFO_LAST_ID);
3977                 if (req == NULL)
3978                         RETURN(-ENOMEM);
3979
3980                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3981                                      RCL_CLIENT, keylen);
3982                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3983                 if (rc) {
3984                         ptlrpc_request_free(req);
3985                         RETURN(rc);
3986                 }
3987
3988                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3989                 memcpy(tmp, key, keylen);
3990
3991                 req->rq_no_delay = req->rq_no_resend = 1;
3992                 ptlrpc_request_set_replen(req);
3993                 rc = ptlrpc_queue_wait(req);
3994                 if (rc)
3995                         GOTO(out, rc);
3996
3997                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3998                 if (reply == NULL)
3999                         GOTO(out, rc = -EPROTO);
4000
4001                 *((obd_id *)val) = *reply;
4002         out:
4003                 ptlrpc_req_finished(req);
4004                 RETURN(rc);
4005         } else if (KEY_IS(KEY_FIEMAP)) {
4006                 struct ptlrpc_request *req;
4007                 struct ll_user_fiemap *reply;
4008                 char *tmp;
4009                 int rc;
4010
4011                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
4012                                            &RQF_OST_GET_INFO_FIEMAP);
4013                 if (req == NULL)
4014                         RETURN(-ENOMEM);
4015
4016                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
4017                                      RCL_CLIENT, keylen);
4018                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4019                                      RCL_CLIENT, *vallen);
4020                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
4021                                      RCL_SERVER, *vallen);
4022
4023                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
4024                 if (rc) {
4025                         ptlrpc_request_free(req);
4026                         RETURN(rc);
4027                 }
4028
4029                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
4030                 memcpy(tmp, key, keylen);
4031                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4032                 memcpy(tmp, val, *vallen);
4033
4034                 ptlrpc_request_set_replen(req);
4035                 rc = ptlrpc_queue_wait(req);
4036                 if (rc)
4037                         GOTO(out1, rc);
4038
4039                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
4040                 if (reply == NULL)
4041                         GOTO(out1, rc = -EPROTO);
4042
4043                 memcpy(val, reply, *vallen);
4044         out1:
4045                 ptlrpc_req_finished(req);
4046
4047                 RETURN(rc);
4048         }
4049
4050         RETURN(-EINVAL);
4051 }
4052
4053 static int osc_setinfo_mds_connect_import(struct obd_import *imp)
4054 {
4055         struct llog_ctxt *ctxt;
4056         int rc = 0;
4057         ENTRY;
4058
4059         ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
4060         if (ctxt) {
4061                 rc = llog_initiator_connect(ctxt);
4062                 llog_ctxt_put(ctxt);
4063         } else {
4064                 /* XXX return an error? skip setting below flags? */
4065         }
4066
4067         cfs_spin_lock(&imp->imp_lock);
4068         imp->imp_server_timeout = 1;
4069         imp->imp_pingable = 1;
4070         cfs_spin_unlock(&imp->imp_lock);
4071         CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
4072
4073         RETURN(rc);
4074 }
4075
4076 static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
4077                                           struct ptlrpc_request *req,
4078                                           void *aa, int rc)
4079 {
4080         ENTRY;
4081         if (rc != 0)
4082                 RETURN(rc);
4083
4084         RETURN(osc_setinfo_mds_connect_import(req->rq_import));
4085 }
4086
4087 static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
4088                               void *key, obd_count vallen, void *val,
4089                               struct ptlrpc_request_set *set)
4090 {
4091         struct ptlrpc_request *req;
4092         struct obd_device     *obd = exp->exp_obd;
4093         struct obd_import     *imp = class_exp2cliimp(exp);
4094         char                  *tmp;
4095         int                    rc;
4096         ENTRY;
4097
4098         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
4099
4100         if (KEY_IS(KEY_NEXT_ID)) {
4101                 obd_id new_val;
4102                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4103
4104                 if (vallen != sizeof(obd_id))
4105                         RETURN(-ERANGE);
4106                 if (val == NULL)
4107                         RETURN(-EINVAL);
4108
4109                 if (vallen != sizeof(obd_id))
4110                         RETURN(-EINVAL);
4111
4112                 /* avoid race between allocate new object and set next id
4113                  * from ll_sync thread */
4114                 cfs_spin_lock(&oscc->oscc_lock);
4115                 new_val = *((obd_id*)val) + 1;
4116                 if (new_val > oscc->oscc_next_id)
4117                         oscc->oscc_next_id = new_val;
4118                 cfs_spin_unlock(&oscc->oscc_lock);
4119                 CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
4120                        exp->exp_obd->obd_name,
4121                        obd->u.cli.cl_oscc.oscc_next_id);
4122
4123                 RETURN(0);
4124         }
4125
4126         if (KEY_IS(KEY_CHECKSUM)) {
4127                 if (vallen != sizeof(int))
4128                         RETURN(-EINVAL);
4129                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
4130                 RETURN(0);
4131         }
4132
4133         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4134                 sptlrpc_conf_client_adapt(obd);
4135                 RETURN(0);
4136         }
4137
4138         if (KEY_IS(KEY_FLUSH_CTX)) {
4139                 sptlrpc_import_flush_my_ctx(imp);
4140                 RETURN(0);
4141         }
4142
4143         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
4144                 RETURN(-EINVAL);
4145
4146         /* We pass all other commands directly to OST. Since nobody calls osc
4147            methods directly and everybody is supposed to go through LOV, we
4148            assume lov checked invalid values for us.
4149            The only recognised values so far are evict_by_nid and mds_conn.
4150            Even if something bad goes through, we'd get a -EINVAL from OST
4151            anyway. */
4152
4153         if (KEY_IS(KEY_GRANT_SHRINK))
4154                 req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
4155         else
4156                 req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
4157
4158         if (req == NULL)
4159                 RETURN(-ENOMEM);
4160
4161         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
4162                              RCL_CLIENT, keylen);
4163         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
4164                              RCL_CLIENT, vallen);
4165         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
4166         if (rc) {
4167                 ptlrpc_request_free(req);
4168                 RETURN(rc);
4169         }
4170
4171         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
4172         memcpy(tmp, key, keylen);
4173         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
4174         memcpy(tmp, val, vallen);
4175
4176         if (KEY_IS(KEY_MDS_CONN)) {
4177                 struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4178
4179                 oscc->oscc_oa.o_seq = (*(__u32 *)val);
4180                 oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
4181                 LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
4182                 req->rq_no_delay = req->rq_no_resend = 1;
4183                 req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
4184         } else if (KEY_IS(KEY_GRANT_SHRINK)) {
4185                 struct osc_grant_args *aa;
4186                 struct obdo *oa;
4187
4188                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
4189                 aa = ptlrpc_req_async_args(req);
4190                 OBDO_ALLOC(oa);
4191                 if (!oa) {
4192                         ptlrpc_req_finished(req);
4193                         RETURN(-ENOMEM);
4194                 }
4195                 *oa = ((struct ost_body *)val)->oa;
4196                 aa->aa_oa = oa;
4197                 req->rq_interpret_reply = osc_shrink_grant_interpret;
4198         }
4199
4200         ptlrpc_request_set_replen(req);
4201         if (!KEY_IS(KEY_GRANT_SHRINK)) {
4202                 LASSERT(set != NULL);
4203                 ptlrpc_set_add_req(set, req);
4204                 ptlrpc_check_set(NULL, set);
4205         } else
4206                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
4207
4208         RETURN(0);
4209 }
4210
4211
4212 static struct llog_operations osc_size_repl_logops = {
4213         lop_cancel: llog_obd_repl_cancel
4214 };
4215
4216 static struct llog_operations osc_mds_ost_orig_logops;
4217
4218 static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4219                            struct obd_device *tgt, struct llog_catid *catid)
4220 {
4221         int rc;
4222         ENTRY;
4223
4224         rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
4225                         &catid->lci_logid, &osc_mds_ost_orig_logops);
4226         if (rc) {
4227                 CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
4228                 GOTO(out, rc);
4229         }
4230
4231         rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
4232                         NULL, &osc_size_repl_logops);
4233         if (rc) {
4234                 struct llog_ctxt *ctxt =
4235                         llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4236                 if (ctxt)
4237                         llog_cleanup(ctxt);
4238                 CERROR("failed LLOG_SIZE_REPL_CTXT\n");
4239         }
4240         GOTO(out, rc);
4241 out:
4242         if (rc) {
4243                 CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
4244                        obd->obd_name, tgt->obd_name, catid, rc);
4245                 CERROR("logid "LPX64":0x%x\n",
4246                        catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
4247         }
4248         return rc;
4249 }
4250
4251 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
4252                          struct obd_device *disk_obd, int *index)
4253 {
4254         struct llog_catid catid;
4255         static char name[32] = CATLIST;
4256         int rc;
4257         ENTRY;
4258
4259         LASSERT(olg == &obd->obd_olg);
4260
4261         cfs_mutex_lock(&olg->olg_cat_processing);
4262         rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
4263         if (rc) {
4264                 CERROR("rc: %d\n", rc);
4265                 GOTO(out, rc);
4266         }
4267
4268         CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
4269                obd->obd_name, *index, catid.lci_logid.lgl_oid,
4270                catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
4271
4272         rc = __osc_llog_init(obd, olg, disk_obd, &catid);
4273         if (rc) {
4274                 CERROR("rc: %d\n", rc);
4275                 GOTO(out, rc);
4276         }
4277
4278         rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
4279         if (rc) {
4280                 CERROR("rc: %d\n", rc);
4281                 GOTO(out, rc);
4282         }
4283
4284  out:
4285         cfs_mutex_unlock(&olg->olg_cat_processing);
4286
4287         return rc;
4288 }
4289
4290 static int osc_llog_finish(struct obd_device *obd, int count)
4291 {
4292         struct llog_ctxt *ctxt;
4293         int rc = 0, rc2 = 0;
4294         ENTRY;
4295
4296         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
4297         if (ctxt)
4298                 rc = llog_cleanup(ctxt);
4299
4300         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4301         if (ctxt)
4302                 rc2 = llog_cleanup(ctxt);
4303         if (!rc)
4304                 rc = rc2;
4305
4306         RETURN(rc);
4307 }
4308
4309 static int osc_reconnect(const struct lu_env *env,
4310                          struct obd_export *exp, struct obd_device *obd,
4311                          struct obd_uuid *cluuid,
4312                          struct obd_connect_data *data,
4313                          void *localdata)
4314 {
4315         struct client_obd *cli = &obd->u.cli;
4316
4317         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
4318                 long lost_grant;
4319
4320                 client_obd_list_lock(&cli->cl_loi_list_lock);
4321                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
4322                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
4323                 lost_grant = cli->cl_lost_grant;
4324                 cli->cl_lost_grant = 0;
4325                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4326
4327                 CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
4328                        "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
4329                        cli->cl_avail_grant, cli->cl_dirty, lost_grant);
4330                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
4331                        " ocd_grant: %d\n", data->ocd_connect_flags,
4332                        data->ocd_version, data->ocd_grant);
4333         }
4334
4335         RETURN(0);
4336 }
4337
4338 static int osc_disconnect(struct obd_export *exp)
4339 {
4340         struct obd_device *obd = class_exp2obd(exp);
4341         struct llog_ctxt  *ctxt;
4342         int rc;
4343
4344         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
4345         if (ctxt) {
4346                 if (obd->u.cli.cl_conn_count == 1) {
4347                         /* Flush any remaining cancel messages out to the
4348                          * target */
4349                         llog_sync(ctxt, exp);
4350                 }
4351                 llog_ctxt_put(ctxt);
4352         } else {
4353                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
4354                        obd);
4355         }
4356
4357         rc = client_disconnect_export(exp);
4358         /**
4359          * Initially we put del_shrink_grant before disconnect_export, but it
4360          * causes the following problem if setup (connect) and cleanup
4361          * (disconnect) are tangled together.
4362          *      connect p1                     disconnect p2
4363          *   ptlrpc_connect_import
4364          *     ...............               class_manual_cleanup
4365          *                                     osc_disconnect
4366          *                                     del_shrink_grant
4367          *   ptlrpc_connect_interrupt
4368          *     init_grant_shrink
4369          *   add this client to shrink list
4370          *                                      cleanup_osc
4371          * Bang! pinger trigger the shrink.
4372          * So the osc should be disconnected from the shrink list, after we
4373          * are sure the import has been destroyed. BUG18662
4374          */
4375         if (obd->u.cli.cl_import == NULL)
4376                 osc_del_shrink_grant(&obd->u.cli);
4377         return rc;
4378 }
4379
4380 static int osc_import_event(struct obd_device *obd,
4381                             struct obd_import *imp,
4382                             enum obd_import_event event)
4383 {
4384         struct client_obd *cli;
4385         int rc = 0;
4386
4387         ENTRY;
4388         LASSERT(imp->imp_obd == obd);
4389
4390         switch (event) {
4391         case IMP_EVENT_DISCON: {
4392                 /* Only do this on the MDS OSC's */
4393                 if (imp->imp_server_timeout) {
4394                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4395
4396                         cfs_spin_lock(&oscc->oscc_lock);
4397                         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
4398                         cfs_spin_unlock(&oscc->oscc_lock);
4399                 }
4400                 cli = &obd->u.cli;
4401                 client_obd_list_lock(&cli->cl_loi_list_lock);
4402                 cli->cl_avail_grant = 0;
4403                 cli->cl_lost_grant = 0;
4404                 client_obd_list_unlock(&cli->cl_loi_list_lock);
4405                 break;
4406         }
4407         case IMP_EVENT_INACTIVE: {
4408                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
4409                 break;
4410         }
4411         case IMP_EVENT_INVALIDATE: {
4412                 struct ldlm_namespace *ns = obd->obd_namespace;
4413                 struct lu_env         *env;
4414                 int                    refcheck;
4415
4416                 env = cl_env_get(&refcheck);
4417                 if (!IS_ERR(env)) {
4418                         /* Reset grants */
4419                         cli = &obd->u.cli;
4420                         client_obd_list_lock(&cli->cl_loi_list_lock);
4421                         /* all pages go to failing rpcs due to the invalid
4422                          * import */
4423                         osc_check_rpcs(env, cli);
4424                         client_obd_list_unlock(&cli->cl_loi_list_lock);
4425
4426                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
4427                         cl_env_put(env, &refcheck);
4428                 } else
4429                         rc = PTR_ERR(env);
4430                 break;
4431         }
4432         case IMP_EVENT_ACTIVE: {
4433                 /* Only do this on the MDS OSC's */
4434                 if (imp->imp_server_timeout) {
4435                         struct osc_creator *oscc = &obd->u.cli.cl_oscc;
4436
4437                         cfs_spin_lock(&oscc->oscc_lock);
4438                         oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
4439                                               OSCC_FLAG_NOSPC_BLK);
4440                         cfs_spin_unlock(&oscc->oscc_lock);
4441                 }
4442                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
4443                 break;
4444         }
4445         case IMP_EVENT_OCD: {
4446                 struct obd_connect_data *ocd = &imp->imp_connect_data;
4447
4448                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
4449                         osc_init_grant(&obd->u.cli, ocd);
4450
4451                 /* See bug 7198 */
4452                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
4453                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
4454
4455                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
4456                 break;
4457         }
4458         case IMP_EVENT_DEACTIVATE: {
4459                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
4460                 break;
4461         }
4462         case IMP_EVENT_ACTIVATE: {
4463                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
4464                 break;
4465         }
4466         default:
4467                 CERROR("Unknown import event %d\n", event);
4468                 LBUG();
4469         }
4470         RETURN(rc);
4471 }
4472
4473 /**
4474  * Determine whether the lock can be canceled before replaying the lock
4475  * during recovery, see bug16774 for detailed information.
4476  *
4477  * \retval zero the lock can't be canceled
4478  * \retval other ok to cancel
4479  */
4480 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
4481 {
4482         check_res_locked(lock->l_resource);
4483
4484         /*
4485          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
4486          *
4487          * XXX as a future improvement, we can also cancel unused write lock
4488          * if it doesn't have dirty data and active mmaps.
4489          */
4490         if (lock->l_resource->lr_type == LDLM_EXTENT &&
4491             (lock->l_granted_mode == LCK_PR ||
4492              lock->l_granted_mode == LCK_CR) &&
4493             (osc_dlm_lock_pageref(lock) == 0))
4494                 RETURN(1);
4495
4496         RETURN(0);
4497 }
4498
4499 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
4500 {
4501         struct client_obd *cli = &obd->u.cli;
4502         int rc;
4503         ENTRY;
4504
4505         ENTRY;
4506         rc = ptlrpcd_addref();
4507         if (rc)
4508                 RETURN(rc);
4509
4510         rc = client_obd_setup(obd, lcfg);
4511         if (rc == 0) {
4512                 void *handler;
4513                 handler = ptlrpcd_alloc_work(cli->cl_import,
4514                                              brw_queue_work, cli);
4515                 if (!IS_ERR(handler))
4516                         cli->cl_writeback_work = handler;
4517                 else
4518                         rc = PTR_ERR(handler);
4519         }
4520
4521         if (rc == 0) {
4522                 struct lprocfs_static_vars lvars = { 0 };
4523
4524                 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
4525                 lprocfs_osc_init_vars(&lvars);
4526                 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
4527                         lproc_osc_attach_seqstat(obd);
4528                         sptlrpc_lprocfs_cliobd_attach(obd);
4529                         ptlrpc_lprocfs_register_obd(obd);
4530                 }
4531
4532                 oscc_init(obd);
4533                 /* We need to allocate a few requests more, because
4534                    brw_interpret tries to create new requests before freeing
4535                    previous ones. Ideally we want to have 2x max_rpcs_in_flight
4536                    reserved, but I afraid that might be too much wasted RAM
4537                    in fact, so 2 is just my guess and still should work. */
4538                 cli->cl_import->imp_rq_pool =
4539                         ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
4540                                             OST_MAXREQSIZE,
4541                                             ptlrpc_add_rqs_to_pool);
4542
4543                 CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
4544
4545                 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
4546         }
4547
4548         if (rc)
4549                 ptlrpcd_decref();
4550         RETURN(rc);
4551 }
4552
4553 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
4554 {
4555         int rc = 0;
4556         ENTRY;
4557
4558         switch (stage) {
4559         case OBD_CLEANUP_EARLY: {
4560                 struct obd_import *imp;
4561                 imp = obd->u.cli.cl_import;
4562                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
4563                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
4564                 ptlrpc_deactivate_import(imp);
4565                 cfs_spin_lock(&imp->imp_lock);
4566                 imp->imp_pingable = 0;
4567                 cfs_spin_unlock(&imp->imp_lock);
4568                 break;
4569         }
4570         case OBD_CLEANUP_EXPORTS: {
4571                 struct client_obd *cli = &obd->u.cli;
4572                 /* LU-464
4573                  * for echo client, export may be on zombie list, wait for
4574                  * zombie thread to cull it, because cli.cl_import will be
4575                  * cleared in client_disconnect_export():
4576                  *   class_export_destroy() -> obd_cleanup() ->
4577                  *   echo_device_free() -> echo_client_cleanup() ->
4578                  *   obd_disconnect() -> osc_disconnect() ->
4579                  *   client_disconnect_export()
4580                  */
4581                 obd_zombie_barrier();
4582                 if (cli->cl_writeback_work) {
4583                         ptlrpcd_destroy_work(cli->cl_writeback_work);
4584                         cli->cl_writeback_work = NULL;
4585                 }
4586                 obd_cleanup_client_import(obd);
4587                 ptlrpc_lprocfs_unregister_obd(obd);
4588                 lprocfs_obd_cleanup(obd);
4589                 rc = obd_llog_finish(obd, 0);
4590                 if (rc != 0)
4591                         CERROR("failed to cleanup llogging subsystems\n");
4592                 break;
4593                 }
4594         }
4595         RETURN(rc);
4596 }
4597
4598 int osc_cleanup(struct obd_device *obd)
4599 {
4600         int rc;
4601
4602         ENTRY;
4603
4604         /* free memory of osc quota cache */
4605         osc_quota_cleanup(obd);
4606
4607         rc = client_obd_cleanup(obd);
4608
4609         ptlrpcd_decref();
4610         RETURN(rc);
4611 }
4612
4613 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
4614 {
4615         struct lprocfs_static_vars lvars = { 0 };
4616         int rc = 0;
4617
4618         lprocfs_osc_init_vars(&lvars);
4619
4620         switch (lcfg->lcfg_command) {
4621         default:
4622                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
4623                                               lcfg, obd);
4624                 if (rc > 0)
4625                         rc = 0;
4626                 break;
4627         }
4628
4629         return(rc);
4630 }
4631
4632 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
4633 {
4634         return osc_process_config_base(obd, buf);
4635 }
4636
4637 struct obd_ops osc_obd_ops = {
4638         .o_owner                = THIS_MODULE,
4639         .o_setup                = osc_setup,
4640         .o_precleanup           = osc_precleanup,
4641         .o_cleanup              = osc_cleanup,
4642         .o_add_conn             = client_import_add_conn,
4643         .o_del_conn             = client_import_del_conn,
4644         .o_connect              = client_connect_import,
4645         .o_reconnect            = osc_reconnect,
4646         .o_disconnect           = osc_disconnect,
4647         .o_statfs               = osc_statfs,
4648         .o_statfs_async         = osc_statfs_async,
4649         .o_packmd               = osc_packmd,
4650         .o_unpackmd             = osc_unpackmd,
4651         .o_precreate            = osc_precreate,
4652         .o_create               = osc_create,
4653         .o_create_async         = osc_create_async,
4654         .o_destroy              = osc_destroy,
4655         .o_getattr              = osc_getattr,
4656         .o_getattr_async        = osc_getattr_async,
4657         .o_setattr              = osc_setattr,
4658         .o_setattr_async        = osc_setattr_async,
4659         .o_brw                  = osc_brw,
4660         .o_punch                = osc_punch,
4661         .o_sync                 = osc_sync,
4662         .o_enqueue              = osc_enqueue,
4663         .o_change_cbdata        = osc_change_cbdata,
4664         .o_find_cbdata          = osc_find_cbdata,
4665         .o_cancel               = osc_cancel,
4666         .o_cancel_unused        = osc_cancel_unused,
4667         .o_iocontrol            = osc_iocontrol,
4668         .o_get_info             = osc_get_info,
4669         .o_set_info_async       = osc_set_info_async,
4670         .o_import_event         = osc_import_event,
4671         .o_llog_init            = osc_llog_init,
4672         .o_llog_finish          = osc_llog_finish,
4673         .o_process_config       = osc_process_config,
4674         .o_quotactl             = osc_quotactl,
4675         .o_quotacheck           = osc_quotacheck,
4676         .o_quota_adjust_qunit   = osc_quota_adjust_qunit,
4677 };
4678
4679 extern struct lu_kmem_descr osc_caches[];
4680 extern cfs_spinlock_t       osc_ast_guard;
4681 extern cfs_lock_class_key_t osc_ast_guard_class;
4682
4683 int __init osc_init(void)
4684 {
4685         struct lprocfs_static_vars lvars = { 0 };
4686         int rc;
4687         ENTRY;
4688
4689         /* print an address of _any_ initialized kernel symbol from this
4690          * module, to allow debugging with gdb that doesn't support data
4691          * symbols from modules.*/
4692         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
4693
4694         rc = lu_kmem_init(osc_caches);
4695
4696         lprocfs_osc_init_vars(&lvars);
4697
4698         osc_quota_init();
4699         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
4700                                  LUSTRE_OSC_NAME, &osc_device_type);
4701         if (rc) {
4702                 lu_kmem_fini(osc_caches);
4703                 RETURN(rc);
4704         }
4705
4706         cfs_spin_lock_init(&osc_ast_guard);
4707         cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
4708
4709         osc_mds_ost_orig_logops = llog_lvfs_ops;
4710         osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
4711         osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
4712         osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
4713         osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
4714
4715         RETURN(rc);
4716 }
4717
4718 #ifdef __KERNEL__
4719 static void /*__exit*/ osc_exit(void)
4720 {
4721         lu_device_type_fini(&osc_device_type);
4722
4723         osc_quota_exit();
4724         class_unregister_type(LUSTRE_OSC_NAME);
4725         lu_kmem_fini(osc_caches);
4726 }
4727
4728 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
4729 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
4730 MODULE_LICENSE("GPL");
4731
4732 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
4733 #endif