3.1. 基本使用 使用 SDK访问 LTS 的服务,需要设置正确的 AccessKey、SecretKey 和服务端 Endpo
int,所有的服务可以使用同一 key 凭证来进行访问,但不同的服务需要使用不同的 endpo
int 进行访问,详情参考天翼云官网SDK接入概述。在调用前SDK,需要已知以下参数: 1、云日志服务访问地址。详情请查看访问地址(Endpo
int)。 2、key凭证:accessKey和secretKey 。详情请查看如何获取访问密钥(AK/SK)。 3、日志项目编码:logProject,在使用SDK前,需要确保您有至少一个已经存在的日志项目,日志项目就是您要将日志上传到的地方。 4、日志单元编码:logUn
it,在使用SDK前,需要确保日志项目中有至少一个已经存在的日志单元。 参数 参数类
型 描述 是否必须 endpo
int str
ing 域名 是 accessKey str
ing AccessKey,简称ak 是 secretKey str
ing SecretKey ,简称sk 是 logProject str
ing 日志项目编码 是 logUn
it str
ing 日志单元编码 是 目前通过SDK将日志上传到云日志服务有两种上传形式:同步上传和异步批量上传,详细信息请参考代码内README文档。 同步上传:当调用日志上传接口时,sdk会立即进行http请求调用,并返回发送结果。这种方式结构简单,可用于发送频率不高的场景。 异步批量上传:当调用日志上传接口时,后台线程会将日志进行累积,当达到发送条件时,会进行一次合并发送。对于需要频繁调用发送接口的场景,这种方式性能更卓越,更高效。 示例代码:同步上传 cpp
int ma
in(
int argc, char argv) { str
ing ak "your accessKey"; str
ing sk "your secretKey"; str
ing logProject "log project Code"; str
ing logUn
it "log un
it Code"; str
ing endpo
int "endpo
int";
int lognum 10; //每次发送10条日志 vector logItems; for (
int k 0; k contents; map labels; contents["level"] str
ing("error"); contents["un
it
id"] 3.1415926; labels["usertag"] str
ing("str
ing"); LogItem logItem; logItem.logT
imestamp logT
imestamp; logItem.or
iMessage or
iMessage; logItem.contents contents; logItem.labels labels; logItems.pushback(logItem); } try { Cl
ient cl
ient new Cl
ient(endpo
int, ak, sk); for (
int
i 0;
i PutLogs(logProject, logUn
it, logItems); cout logItems; for (
int k 0; k contents; map labels; contents["level"] str
ing("error"); contents["un
it
id"] 3.1415926; labels["usertag"] str
ing("str
ing"); LogItem logItem; logItem.logT
imestamp logT
imestamp; logItem.or
iMessage or
iMessage; logItem.contents contents; logItem.labels labels; logItems.pushback(logItem); } try { ProducerConf
ig producerConf
ig ProducerConf
ig(); //使用默认producer配置 Cl
ientConf
ig cl
ientConf
ig(endpo
int, ak, sk, logProject);//可使用不同cl
ient配置,生成多个cl
ient,如果希望使用同一个,logProject默认设置为"" 即可 Cl
ient cl
ient createCl
ient(cl
ientConf
ig); map cl
ientPool; cl
ientPool.
insert(makepa
ir(cl
ient>GetLogProject(), cl
ient)); IoWorker
ioWorker(cl
ientPool, producerConf
ig.getMaxWorkerCount()); //线程池 LogAccumulator logAccumulator(producerConf
ig,
ioWorker); //日志累加器 Mover mover(logAccumulator,
ioWorker, producerConf
ig.getL
ingerMs()); //定时器 Producer producer(producerConf
ig,
ioWorker, logAccumulator, mover); //日志发送器 producer.start(); for (
int
i0;
i<5000;
i++){ producer.sendLogsCallback(logProject,logUn
it,logItems); } std::th
isthread::sleepfor(std::chrono::m
ill
iseconds(5000)); for (
int
i 0;
i < 5000;
i++) { producer.sendLogsCallback(logProject, logUn
it, logItems); } std::th
isthread::sleepfor(std::chrono::m
ill
iseconds(3000)); producer.safeClose(); } catch (except
ion &e) { cout<
如何实现RabbitMQ的高性能 使用集群的负载均衡 队列的性能受单个CPU内核控制,当一个RabbitMQ节点处理消息的能力达到瓶颈时,可以通过集群进行扩展,从而达到提升吞吐量的目的。 使用多个节点,集群会自动将队列均衡的创建在各个节点上。除了使用集群模式,您还可以使用以下两个插件优化负载均衡: Consistent hash exchange 该插件使用交换器来平衡队列之间的消息。根据消息的路由键,发送到交换器的消息一致且均匀地分布在多个队列中。该插件创建路由键的散列,并将消息传播到与该交换器具有绑定关系的队列中。使用此插件时,需要确保消费者从所有队列中消费。 使用示例如下: 使用不同的路由键来路由消息。 public class ConsistentHashExchangeExample1 { private static String CONSISTENTHASHEXCHANGETYPE "xconsistenthash"; public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException { ConnectionFactory cf new ConnectionFactory(); Connection conn cf.newConnection(); Channel ch conn.createChannel(); for (String q : Arrays.asList("q1", "q2", "q3", "q4")) { ch.queueDeclare(q, true, false, false, null); ch.queuePurge(q); } ch.exchangeDeclare("e1", CONSISTENTHASHEXCHANGETYPE, true, false, null); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, "e1", "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, "e1", "2"); } ch.confirmSelect(); AMQP.BasicProperties.Builder bldr new AMQP.BasicProperties.Builder(); for (int i 0; i args new HashMap<>(); args.put("hashheader", "hashon"); ch.exchangeDeclare(EXCHANGE, EXCHANGETYPE, true, false, args); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, EXCHANGE, "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, EXCHANGE, "2"); } ch.confirmSelect(); for (int i 0; i hdrs new HashMap<>(); hdrs.put("hashon", String.valueOf(i)); ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF8")); } ch.waitForConfirmsOrDie(10000); System.out.println("Done publishing!"); System.out.println("Evaluating results..."); // wait for one stats emission interval so that queue counters // are uptodate in the management UI Thread.sleep(5); System.out.println("Done."); conn.close(); } } 使用消息属性来路由消息,例如messageid、correlationid或timestamp属性。该方式需要使用“hashproperty”参数来声明交换器,且消息必须带有所选择的消息属性,否则会被路由到相同的队列。 public class ConsistentHashExchangeExample2 { public static final String EXCHANGE "e2"; private static String EXCHANGETYPE "xconsistenthash"; public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException { ConnectionFactory cf new ConnectionFactory(); Connection conn cf.newConnection(); Channel ch conn.createChannel(); for (String q : Arrays.asList("q1", "q2", "q3", "q4")) { ch.queueDeclare(q, true, false, false, null); ch.queuePurge(q); } Map args new HashMap<>(); args.put("hashheader", "hashon"); ch.exchangeDeclare(EXCHANGE, EXCHANGETYPE, true, false, args); for (String q : Arrays.asList("q1", "q2")) { ch.queueBind(q, EXCHANGE, "1"); } for (String q : Arrays.asList("q3", "q4")) { ch.queueBind(q, EXCHANGE, "2"); } ch.confirmSelect(); for (int i 0; i hdrs new HashMap<>(); hdrs.put("hashon", String.valueOf(i)); ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF8")); } ch.waitForConfirmsOrDie(10000); System.out.println("Done publishing!"); System.out.println("Evaluating results..."); // wait for one stats emission interval so that queue counters // are uptodate in the management UI Thread.sleep(5); System.out.println("Done."); conn.close(); } }