scratch – Blame information for rev

Subversion Repositories:
Rev:
Rev Author Line No. Line
115 office 1 <?php
2  
3 /*
4 * This file is part of the Monolog package.
5 *
6 * (c) Jordi Boggiano <j.boggiano@seld.be>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11  
12 namespace Monolog\Handler;
13  
14 use Monolog\Logger;
15 use Monolog\Formatter\JsonFormatter;
16 use PhpAmqpLib\Message\AMQPMessage;
17 use PhpAmqpLib\Channel\AMQPChannel;
18 use AMQPExchange;
19  
20 class AmqpHandler extends AbstractProcessingHandler
21 {
22 /**
23 * @var AMQPExchange|AMQPChannel $exchange
24 */
25 protected $exchange;
26  
27 /**
28 * @var string
29 */
30 protected $exchangeName;
31  
32 /**
33 * @param AMQPExchange|AMQPChannel $exchange AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
34 * @param string $exchangeName
35 * @param int $level
36 * @param bool $bubble Whether the messages that are handled can bubble up the stack or not
37 */
38 public function __construct($exchange, $exchangeName = 'log', $level = Logger::DEBUG, $bubble = true)
39 {
40 if ($exchange instanceof AMQPExchange) {
41 $exchange->setName($exchangeName);
42 } elseif ($exchange instanceof AMQPChannel) {
43 $this->exchangeName = $exchangeName;
44 } else {
45 throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
46 }
47 $this->exchange = $exchange;
48  
49 parent::__construct($level, $bubble);
50 }
51  
52 /**
53 * {@inheritDoc}
54 */
55 protected function write(array $record)
56 {
57 $data = $record["formatted"];
58 $routingKey = $this->getRoutingKey($record);
59  
60 if ($this->exchange instanceof AMQPExchange) {
61 $this->exchange->publish(
62 $data,
63 $routingKey,
64 0,
65 array(
66 'delivery_mode' => 2,
67 'content_type' => 'application/json',
68 )
69 );
70 } else {
71 $this->exchange->basic_publish(
72 $this->createAmqpMessage($data),
73 $this->exchangeName,
74 $routingKey
75 );
76 }
77 }
78  
79 /**
80 * {@inheritDoc}
81 */
82 public function handleBatch(array $records)
83 {
84 if ($this->exchange instanceof AMQPExchange) {
85 parent::handleBatch($records);
86  
87 return;
88 }
89  
90 foreach ($records as $record) {
91 if (!$this->isHandling($record)) {
92 continue;
93 }
94  
95 $record = $this->processRecord($record);
96 $data = $this->getFormatter()->format($record);
97  
98 $this->exchange->batch_basic_publish(
99 $this->createAmqpMessage($data),
100 $this->exchangeName,
101 $this->getRoutingKey($record)
102 );
103 }
104  
105 $this->exchange->publish_batch();
106 }
107  
108 /**
109 * Gets the routing key for the AMQP exchange
110 *
111 * @param array $record
112 * @return string
113 */
114 protected function getRoutingKey(array $record)
115 {
116 $routingKey = sprintf(
117 '%s.%s',
118 // TODO 2.0 remove substr call
119 substr($record['level_name'], 0, 4),
120 $record['channel']
121 );
122  
123 return strtolower($routingKey);
124 }
125  
126 /**
127 * @param string $data
128 * @return AMQPMessage
129 */
130 private function createAmqpMessage($data)
131 {
132 return new AMQPMessage(
133 (string) $data,
134 array(
135 'delivery_mode' => 2,
136 'content_type' => 'application/json',
137 )
138 );
139 }
140  
141 /**
142 * {@inheritDoc}
143 */
144 protected function getDefaultFormatter()
145 {
146 return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false);
147 }
148 }