Select Git revision
Ccls2Dist.java
Tomasz Walkowiak authored
Ccls2Dist.java 8.85 KiB
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package pl.clarin.converter;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import g419.corpus.io.reader.AbstractDocumentReader;
import g419.corpus.io.reader.ReaderFactory;
import g419.corpus.structure.Document;
import g419.corpus.structure.Sentence;
import g419.corpus.structure.Token;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
/**
*
* @author Tomasz Walkowiak
*/
public class Ccls2Dist {
private Filter f;
private Channel channel;
private String replyQueueName;
private int number=0;
private QueueingConsumer consumer;
private JSONArray results;
private Map<String,JSONObject> fromToMap;
private final String prefix;
private final Connection connection;
public Ccls2Dist(Connection c,String _prefix)
{ connection=c;
prefix=_prefix;
}
private void initConn() throws IOException
{ Logger.getLogger(Ccls2Dist.class.getName()).log(Level.DEBUG, "Setting up channel..");
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
Logger.getLogger(Ccls2Dist.class.getName()).log(Level.DEBUG, "Channel set up...");
}
private void closeConn() throws Exception
{
channel.close();
}
private void callRabbit(JSONObject call,String corrId)
{
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
String message=call.toString();
try {
channel.basicPublish("",prefix+call.getString("tool"), props, message.getBytes("UTF-8"));
number++;
} catch (Exception ex) {
Logger.getLogger(Ccls2Dist.class.getName()).log(Level.FATAL, null, ex);
}
}
private void makeCall(String from,String to,JSONObject oi,JSONObject oj)
{ JSONObject call=new JSONObject();
call.put("task", "similarities");
call.put("tool", "word2vec");
call.put("lexeme", "");
call.put("lexeme1",oi.get("lemmas"));
call.put("lexeme2",oj.get("lemmas"));
String corrId = from+"::"+to;
JSONObject ref=new JSONObject();
ref.put("from",from);
ref.put("to",to);
fromToMap.put(corrId,ref);
while (number>50)
{
peek(20);
}
callRabbit(call,corrId);
}
private void peek(int time)
{ QueueingConsumer.Delivery delivery;
//Logger.getLogger(Ccls2Dist.class.getName()).log(Level.DEBUG, "Peeking for "+number);
try {
delivery = consumer.nextDelivery(time);
if (delivery==null) return;
String name=delivery.getProperties().getCorrelationId();
number--;
String response = new String(delivery.getBody(),"UTF-8");
JSONObject fullanswer=new JSONObject(response);
JSONObject result = fullanswer.getJSONObject("results");
result.put("id",fromToMap.remove(name));
results.put(result);
//Logger.getLogger(Ccls2Dist.class.getName()).log(Level.DEBUG, "Received "+name+" number left:"+number);
} catch (Exception ex) {
Logger.getLogger(Ccls2Dist.class.getName()).log(Level.FATAL, null, ex);
}
}
public void process(String fileIn, String fileOut, JSONObject options) throws Exception {
JSONObject response=new JSONObject();
File file = new File(fileIn);
results=new JSONArray();
fromToMap=new HashMap<>();
initConn();
if (file.isDirectory()) {
int size=file.list().length;
JSONObject[] datas=new JSONObject[size];
String [] names=new String[size];
int i=0;
for (String fn : file.list()) {
names[i]=fn;
datas[i] = readData(fileIn+"/"+fn);
for (int j=0;j<i;j++)
{ makeCall(names[i],names[j],datas[i],datas[j]);
peek(10);
}
i=i+1;
}
while (this.number>0)
peek(2000);
response.put("dist", results);
JSONObject data=new JSONObject();
for (i=0;i<names.length;i++)
data.put(names[i], datas[i]);
response.put("data", data);
}
closeConn();
Files.write(Paths.get(fileOut), response.toString().getBytes());
}
private String getToken(Token t) {
String res=t.getAttributeValue(1);
String tag = t.getDisambTag().getCtag();
//if (f.filterout(t)) return null;
String[] tags = t.getDisambTag().getCtag().split(":");
if (isCzasownik(tags[0]))
return res+"::verb";
if (isRzeczownik(tags[0]))
return res+"::noun";
return null;
}
private Map<String, Integer> readCCL(String file) {
Map<String, Integer> map = new HashMap<>();
Document ps = null;
if (file != null) {
try {
AbstractDocumentReader reader = ReaderFactory.get().getStreamReader(file, "ccl");
ps = reader.nextDocument();
reader.close();
ps.getSentences().stream().forEach((Sentence sent) -> {
ArrayList<Token> tokens = sent.getTokens();
for (int i = 0; i < sent.getTokenNumber(); i++) {
String el = getToken(tokens.get(i));
if (el != null && !el.isEmpty()) {
if (map.containsKey(el)) {
map.put(el, map.get(el) + 1);
} else {
map.put(el, 1);
}
}
}
});
} catch (Exception e) {
}
}
return map;
}
private JSONObject readData(String file) {
JSONObject result = new JSONObject();
Map<String, Integer> map = readCCL(file);
JSONArray lemma=new JSONArray();
JSONArray counts=new JSONArray();
for (String el: map.keySet())
{ lemma.put(el);
counts.put(map.get(el));
}
result.put("lemmas", lemma);
result.put("counts", counts);
return result;
}
private boolean isRzeczownik(String tag) {
if (tag.equals("subst"))
return true;
return false;
}
private boolean isCzasownik(String tag) {
/*if (tag.equals("aglt")) {
return true;
} */
if (tag.equals("bedzie")) {
return true;
}
if (tag.equals("fin")) {
return true;
}
if (tag.equals("imps")) {
return true;
}
if (tag.equals("impt")) {
return true;
}
if (tag.equals("inf")) {
return true;
}
if (tag.equals("ppas")) {
return true;
}
if (tag.equals("inf")) {
return true;
}
if (tag.equals("praet")) {
return true;
}
if (tag.equals("winien")) {
return true;
}
return false;
}
private static Connection initRabbit() throws IOException, TimeoutException {
//Rabbit
Logger.getLogger(Ccls2Dist.class.getName()).log(Level.DEBUG, "Init connection ...");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("10.17.0.85");
factory.setPassword("clarin123");
factory.setUsername("clarin");
Logger.getLogger(Ccls2Dist.class.getName()).log(Level.DEBUG, "Connection set up...");
return factory.newConnection();
}
public static void main(String[] args) throws Exception {
//test();
pl.clarin.ws.worker.Service.initlogging();
Connection c=initRabbit();
new Ccls2Dist(c,"lex_").process("4", "out.json", new JSONObject());
c.close();
}
}