11const kafka_log = require ( 'debug' ) ( 'goftare:kafka' ) ;
22const winston = require ( '../utils/logger' ) ;
33const validator = require ( '../utils/validator' ) ;
4- // const { producer } = require('../utils/kafka');
4+ const { producer, km} = require ( '../utils/kafka' ) ;
5+ const mongoose = require ( 'mongoose' ) ;
6+ const moment = require ( 'moment-jalaali' ) ;
57const room_dao = require ( '../dao/room_dao' ) ;
68const user_dao = require ( '../dao/user_dao' ) ;
79const config = require ( '../config/config' ) ;
@@ -38,8 +40,15 @@ exports.joinRoom = async (io, socket, data, callback) => {
3840const members = await room_dao . joinSocketToRoom ( room_id , socket . id , joined_id ) ;
3941if ( members ) {
4042socket . join ( room_id ) ;
41- //also done with broadcast
42- io . in ( room_id ) . emit ( 'joined_room' , Object . assign ( { } , config . RESPONSE , {
43+ /**
44+ *@memberof socket_handler - this is a backend event
45+ *@event joined_room
46+ *@emits joined_id - the joined user id
47+ *@emits members - the array of members objects
48+ *@description the answer of the join room is an event named joined_room
49+ * which gives back the joined id and the "online" members
50+ */
51+ io . to ( room_id ) . emit ( 'joined_room' , Object . assign ( { } , config . RESPONSE , {
4352message :'successful' ,
4453data :{
4554joined_id,
@@ -83,7 +92,7 @@ exports.createRoom = async (io, socket, data, callback) => {
8392message :'input is not valid' ,
8493data :valid . error
8594} ) ) ;
86- } else {
95+ } else {
8796const sockets = [ ] ;
8897
8998const saved = await room_dao . joinSocketToRoom ( _id , socket . id , creator_id ) ;
@@ -96,24 +105,54 @@ exports.createRoom = async (io, socket, data, callback) => {
96105}
97106}
98107}
99-
108+
100109// const created = await room_dao.saveRoom(room_id, sockets);
101110kafka_log ( '........................\n' + sockets + '\n.............................' ) ;
102- if ( sockets . length !== 0 ) {
103- for ( const socket of sockets ) {
104- io . to ( socket ) . emit ( 'new_room' , Object . assign ( { } , config . RESPONSE , {
105- _id,
106- members
111+ const keyedMessage = new km ( 'readMessage' , JSON . stringify ( {
112+ text :'room_created' ,
113+ room_id :_id ,
114+ timestamp :new Date ( ) . getTime ( ) ,
115+ moment :moment ( ) . format ( 'jYYYY/jMM/jDD HH:mm:ss' ) ,
116+ sender_id :creator_id ,
117+ message_id :mongoose . Types . ObjectId ( )
118+ } ) ) ;
119+ producer . send ( [ Object . assign ( config . PAYLOAD , {
120+ messages :[ keyedMessage ]
121+ } ) ] , function ( e , result ) {
122+ if ( e ) {
123+ callback ( Object . assign ( { } , config . RESPONSE , {
124+ result :false ,
125+ message :'kafka error' ,
126+ data :e
107127} ) ) ;
128+ } else {
129+ if ( sockets . length !== 0 ) {
130+ for ( const socket of sockets ) {
131+ /**
132+ *@memberof socket_handler - this is a backend event
133+ *@event new_room
134+ *@emits _id - the room id
135+ *@emits members - new room members
136+ *@description - this event indicates that a new room is
137+ * created and is supposed to emit a "join_room" after reciving
138+ */
139+ io . to ( socket ) . emit ( 'new_room' , Object . assign ( { } , config . RESPONSE , {
140+ _id,
141+ members
142+ } ) ) ;
143+ }
144+ callback ( Object . assign ( { } , config . RESPONSE , {
145+ message :'room_created' ,
146+ data :result
147+ } ) ) ;
148+
149+ } else
150+ callback ( Object . assign ( { } , config . RESPONSE , {
151+ result :false ,
152+ message :'failed to create the room by socket'
153+ } ) ) ;
108154}
109- callback ( Object . assign ( { } , config . RESPONSE , {
110- message :'room_created'
111- } ) ) ;
112- } else
113- callback ( Object . assign ( { } , config . RESPONSE , {
114- result :false ,
115- message :'failed to create the room by socket'
116- } ) ) ;
155+ } ) ;
117156}
118157} catch ( e ) {
119158callback ( Object . assign ( { } , config . RESPONSE , {
@@ -140,6 +179,12 @@ exports.disconnecting = async (io, socket) => {
140179try {
141180const removedSocket = await room_dao . removeSocketFromRoom ( rooms , socket . id ) ;
142181for ( const room of removedSocket . rooms ) {
182+ /**
183+ *@memberof socket_handler
184+ *@event disconnected - this event happens when a user is disconnected from socket_io
185+ *@emits sender_id - id of the user who is disconnected
186+ *@description - user is disconnecting socket_io
187+ */
143188io . to ( room ) . emit ( 'disconnected' , Object . assign ( { } , config . RESPONSE , {
144189message :'user disconnected' ,
145190data :{