Slim Framework RabbitMQ

news/2024/7/5 4:57:25

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

关于 illuminate/database 的注册部分参考这个即可

下面我要另外说明的一点是, 在 settings 中对 database 的部分有一些区别,主要是为了

区分正式数据库和测试数据库。

<?php

$remote_addr = empty($_SERVER['REMOTE_ADDR']) ? "" : $_SERVER['REMOTE_ADDR'];
return [
    'settings' => [
        // Database settings
        'database' => [
            'driver'    => getenv('DB_DRIVER'), //mysql
            'host'      => getenv('DB_HOST'), //ip地址
            'database'  => ($remote_addr == getenv('DB_HOST')) ? getenv('DB_NAME') : getenv('DB_NAME_TEST'), // 这里我想用 REMOTE_ADDR 来区分 正式数据库 和 测试数据库,暂时不考虑反向代理的问题
            'username'  => getenv('DB_USER'),//用户名
            'password'  => getenv('DB_PASSWORD'),//密码
            'port'      => getenv('DB_PORT'),//端口
            'charset'   => getenv('DB_CHARSET'),//utf8
            'collation' => getenv('DB_COLLATION'),//utf8_unicode_ci
            'prefix'    => getenv('DB_PREFIX'),//prefix_
        ]
    ],
];

这样我们就简单的实现了正式数据库和测试数据库的区分。

然后在 Slim 框架中注册我们的 RabbitMQ 依赖。这里我采用了一个相对统一的操作消息队列的包来处理消息队列。注册之前我们先

安装相关扩展和包文件。

amqp扩展安装 Communicate with any AMQP compliant server

amqp 包

AMQP transport

安装完成之后,我们就可以来添加依赖了。

在 Slim 框架 DI 中添加依赖。

<?php

use Enqueue\AmqpExt\AmqpConnectionFactory;

$container['amqpfactory'] = function () {
    $factory = new AmqpConnectionFactory([
        'host'      => 'localhost',
        'port'      => 5672,
        'vhost'     => '/',
        'user'      => '你的 rabbitmq 用户',
        'pass'      => '你的 rabbitmq 密码',
        'persisted' => false,
    ]);
    return $factory;
};

然后在基类中声明类成员属性 $amqpfactory; 并用phpdoc var 来指定属性所属类文件 同样这里我们可以采用直接声明赋值属性,这种类型的属性声明要用protected来声明属性。

直接声明加载依赖

<?php

namespace 你的文件命名空间;

use Interop\Container\ContainerInterface;

class BaseController
{
    /** @var \Interop\Container\ContainerInterface */
    protected $container;

    /** @var \Enqueue\AmqpExt\AmqpConnectionFactory */
    protected $amqpfactory;

    /**
     * BaseController constructor.
     *
     * @param \Interop\Container\ContainerInterface $container
     */
    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
        $this->amqpfactory = $this->container->get('amqpfactory');
    }
}

也可以用

php magic methods

方式来在代码中加载属性 (private) 指向的实例。

<?php

namespace 你的文件命名空间;

use Interop\Container\ContainerInterface;

class BaseController
{
    /** @var \Interop\Container\ContainerInterface */
    protected $container;

    /** @var \Enqueue\AmqpExt\AmqpConnectionFactory */
    private $amqpfactory;

    /**
     * BaseController constructor.
     *
     * @param \Interop\Container\ContainerInterface $container
     */
    public function __construct(ContainerInterface $container)
    {
        $this->container = $container;
    }

    /**
     * @param $name
     * @return mixed
     */
    public function __isset ($name)
    {
        return $this->container->{$name};
    }

    /**
     * @param $name
     * @param $value
     * @return mixed
     */
    public function __set ($name, $value)
    {
        return $this->container->{$name} = $value;
    }

    /**
     * @param $name
     * @return mixed
     */
    public function __get ($name)
    {
        return $this->container->{$name};
    }
}

两种方式调用方式相同

$this->amqpfactory->你的方法……

基础工作基本完成,现在来描述一下我的具体

需求

当用户搜索关键字,这个关键字假定说是一条公司记录,同时我们有一块区域是用于展示热门搜索的,那这个热门搜索的定义我们是来按该公司被用户检索过的记录次数来区分的,被检索的越多次说明该公司热门程度越高。

那需求有了,接下来我们需要分析,

1 对于这个需求有几种合适的解决方案?

2 这些方案分别有什么利各弊?

3 我们会选择哪种方案来满足这个需求?

4 还有哪些能改进的地方?

那思路有了,具体的分析大家可以结全自身来具体分析,这里我就不浪费大家保贵的时间来序述了。 在这里我只描述一点我对于这个需求的解决方案。这里我采用了rabbitmq消息队列来处理。之所以采用这个方案一方面是为了保证前端接收数据的及时,另一方面也是保证数据库负载的有效降压。

假设下面就是你的业务代码,主要分为两部分,一部分用于将查询出来的结果 pulisher 到消息队列中,另一部分是用于 consumer 丛消息队列中取出数据进行处理。

Puliser 部分

<?php

namespace 你的命名空间;

use Interop\Container\ContainerInterface;
use Conduit\Controllers\BaseController;
use Interop\Amqp\AmqpTopic;
use Slim\Http\Request;
use Slim\Http\Response;

class CompanyController extends BaseController
{
    public function __construct(ContainerInterface $container)
    {
        parent::__construct($container);
        // check database is development or produce
    }

    public function homeSearchIndex(Request $request, Response $response, array $args)
    {
        // $company 假设是你查询出来的结果,这里因为我用的 Eloquent ORM 所以查询出来的结果是集合 collection 的形式,用 pluck 处理得到所有的公司 id,在将数据转换为 json 格式

        // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        // amqp message body
        $mqbody = $company->pluck('id')->toJson(); // $mqbody 为查询出的所有符合条件的公司 id 的 json 数组

        // pulisher mqbody to rabbitmq
        $psrContext = $this->amqpfactory->createContext();
        // Declare topic
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        $topic = $psrContext->createTopic('homepagesearch');
        $topic->setType(AmqpTopic::TYPE_FANOUT);
        $psrContext->declareTopic($topic);
        // Send message to topic
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        /** @var \Interop\Amqp\Impl\AmqpTopic $topic */
        $message = $psrContext->createMessage($mqbody);
        $psrContext->createProducer()->send($topic, $message);
        // Close
        $psrContext->close();
        // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++

        // …… 其他你的后置操作
    }
}

然后我们编写,消息处理部分也就是我们的消费者。这里需要我们先安装一个 php cli 模式下运行的包,这个包主要用于和我们框架的集成,如果不运用框架本身的一些配置,单独做也可以,这里我就集成到了框架中。

adrianfalleiro/slim-cli-runner

Consumer

<?php

namespace 你的命名空间;

use Conduit\Controllers\BaseController;
use Conduit\Models\Company;
use Interop\Container\ContainerInterface;
use Interop\Amqp\AmqpTopic;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\Impl\AmqpBind;

class HomePageSearchTask extends BaseController
{
    public function __construct(ContainerInterface $container)
    {
        parent::__construct($container);
    }

    public function command()
    {
        // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
        // consumer
        $psrContext = $this->amqpfactory->createContext();
        // Declare topic
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        $topic = $psrContext->createTopic('homepagesearch');
        $topic->setType(AmqpTopic::TYPE_FANOUT);
        $psrContext->declareTopic($topic);
        // Declare queue
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        $queue = $psrContext->createQueue('homepagesearch');
        $queue->addFlag(AmqpQueue::FLAG_DURABLE);
        $psrContext->declareQueue($queue);
        // Bind queue to topic
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        /** @var \Interop\Amqp\Impl\AmqpQueue $queue */
        /** @var \Interop\Amqp\Impl\AmqpTopic $topic */
        $psrContext->bind(new AmqpBind($topic, $queue));
        // Consume message
        /** @var \Enqueue\AmqpExt\AmqpContext $psrContext */
        /** @var \Interop\Amqp\Impl\AmqpQueue $queue */
        $consumer = $psrContext->createConsumer($queue);
        while (true) {
            if ($message = $consumer->receive()) {
                $bodyarr = json_decode($message->getBody(), true); // 还记得上面我们的 publisher的数据格式么?json的需要解析一下
                // process a message 这里你需要针对你自身需求来定制你的处理方式
                Company::query()->whereIn("id", $bodyarr)->increment("search");
                $consumer->acknowledge($message);
            }
        }
        // Close content
        $psrContext->close();
        // +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    }
}

那到现在,当你运行查询的时候你的 publisher exchange 将会创建。

那 consumer 又该如何运行起来呢?还记得我们安装的 slim-cli-runner 包文件么?现在需要用到它的时候 了。

php 你的框架项目绝对路径/public/index.php HomePageSearchTask > /dev/null 2>&1 &

这样就运行起来我们的 consumer 来处理队列中的消息了。

那还有一点问题就是,当我关掉终端窗口的时候这个 consumer 该如何做为守户进程执行。这里有好多监控进程的方案,像 supervisor,crontab等,因为这块我也没下功夫研究,

采用了 crontab 的方式来监控。

首先我们启动一个进程。

# setsid php 你的框架项目绝对路径/public/index.php HomePageSearchTask  > /dev/null 2>&1 &

此时进程状态返回值为 0

# ps aux|grep HomePageSearchTask | grep -v grep|wc -l

创建我们要执行的 shell 脚本

#!/bin/bash
alive=`ps aux|grep HomePageSearchTask | grep -v grep|wc -l`
if [ $alive -eq 0 ]
then
nohup /usr/local/php/bin/php 你的框架项目绝对路径/public/index.php HomePageSearchTask > /dev/null 2>&1 &
fi

修改 shell 脚本执行权限

# chomod +x HomePageSearchTask.sh

手动执行 sh 可检测书写是否有语法错误

sh HomePageSearchTask .sh

创建定时任务脚本,每 8 分钟执行检测

# crontab -e
0-59/8 * * * * /root/HomePageSearchTask.sh > /dev/null 2>&1 &

等待,返回值为 1,则说明我们的 HomePageSearch 进程在成功运行

# ps aux|grep HomePageSearchTask | grep -v grep|wc -l

这里面还有一个

小插曲

就是我上面不是用 REMOTE_ADDR 来测试执行的是哪个数据库么?正式的还是测试的数据库,

那在本地测试的话 consumer 处理的数据是处理到了测试数据库,这也是我想要的,

但是当我执行的 publiser 是正式数据库的时候,consumer 处理的数据还是处理到了测试数据库中,

几乎把所有能调试的地方我都调试了,没找到问题。

后来我发现在我 publiser 到队列中消息体的时候做了下 toJson()的转换,这个问题竟然解决了,但是我之前也是用 json_encode($mqbody) 来处理的啊,那这个 toJson 和 json_encode() 的效果应该是一样的啊,一直到现在我都不太清楚到底是为什么会造成执行数据库不一致的问题,

初步猜想

1 可能是这个 REMOTE_ADDR 的问题,但是都打印测试查看过的,没有问题

2 发布消息体到队列的时候,消息体内容格式没有做好处理,因为我就是把消息内容主体处理了下就好了

但我想事情肯定没有这么简单,如果有知道的大佬,还请多多指教啊。小弟在这里先行答谢了。

转载于:https://my.oschina.net/yeahlife/blog/1817357


http://www.niftyadmin.cn/n/3090024.html

相关文章

暑假周总结八9.2

开学了&#xff0c;假期也就结束了&#xff0c;大三了&#xff0c;对上学还是恐惧&#xff0c;开学有很多课&#xff0c;双学位也不想放弃&#xff0c;系主任的暑假任务也没完成&#xff0c;成功不看有多聪明&#xff0c;看有多大的毅力&#xff0c;毅力是没有的&#xff0c;有…

更新mysql表数据alter_Mysql数据库alter修改表的所有场景和用法

如果你想要修改表的信息&#xff0c;你会发现alter很强大。我们可以看到这样一张表。CREATE TABLE score (student_id int(10) unsigned NOT NULL,event_id int(10) unsigned NOT NULL,score int(11) DEFAULT NULL,PRIMARY KEY (event_id,student_id),KEY student_id (student_…

(转)深入剖析C#继承机制

一. 继承基础知识 为了提高软件模块的可复用性和可扩充性&#xff0c;以便提高软件的开发效率&#xff0c;我们总是希望能够利用前人或自己以前的开发成果&#xff0c;同时又希望在自己的开发过程中能够有足够的灵活性&#xff0c;不拘泥于复用的模块。C#这种完全面向对象的程序…

centos7之docker安装

下午四点左右&#xff0c;我准备接触docker这个技术。之所以接触它&#xff0c;原因来自tomcat服务器老是挂&#xff0c;也不能说老是挂&#xff0c;一周一次吧&#xff0c;或者不定时&#xff0c;最初出现的问题&#xff0c;分为这么几类&#xff1f; 一类&#xff0c;java代码…

数据库CRUD操作以及MyBatis的配置使用

• 业务字段设计 • 数据库创建 • CRUD操作 • MyBatis集成• 注解和XML定义• ViewObject和DateTool • 首页开发• 业务字段设计实体&#xff1a; name:logub_ticket lable: id user_id ticket expired status • 数据库创建 GUI版本管理工具创建&#xff0c;然后通过GUI转…

java页面乱码_Java Web项目中解决中文乱码方法

Java Web项目中解决中文乱码方法Java具有简单性、面向对象、分布式、健壮性、安全性、平台的独立与可移植性、多线程、动态性等特点。下文是为大家精选的Java Web项目中解决中文乱码方法&#xff0c;欢迎大家阅读参考。第一种情况&#xff1a;调用jsp页面中文显示乱码问题描述&…

手动卸载windows服务

使用windows命令行工具&#xff1a;sc。大名鼎鼎的sc。欲知详细用法&#xff0c;请直接在cmd下键入sc&#xff0c;即显示用法。嘿嘿&#xff0c;很好用的&#xff01;另外&#xff0c;MySql的服务注册其实很简单&#xff0c;即简单地 D:\mysql\bin>mysqld -install Service…

捷账宝常规操作

1.安装流程按“小房子”形状的按钮&#xff0c;进入设备桌面—>打开“拉卡拉应用商店”—>点击左上角搜索按钮&#xff0c;输入“捷账宝”(字不能错&#xff0c;必须一定是这三个字)—>点击右边搜索—>点击“捷账宝”后面的“下载”&#xff0c;下载完成后—>安…